Theory-driven Measurement, Analysis, and Archiving
Authors
Affiliations
Arkadiusz Białek*
Jagiellonian University, Poland
Wim Pouw*
Tilburg University, Netherlands
James Trujillo
University of Amsterdam, Netherlands
Fred Hasselman
Radboud University, Netherlands
Babajide Alamu Owoyele
Hasso Plattner Institute, University of Potsdam, Germany
Natalia Siekiera
Jagiellonian University, Poland
Joanna Rączaszek-Leonardi
University of Warsaw, Poland
Travis J. Wiltshire
Tilburg University, Netherlands
Published
December 16, 2025
* Shared first author.
1 Overview
To increase reproducibility of our approach we have designed this Quarto notebook that provides the user with what is effectively a manual-style extended methods and results section for two cases that follow the pipeline descrbed in Bialek, Pouw et al (under review). Next to staging and (visually) explaining key Python and R code elements, the notebook contains detailed further “Do It Yourself” (DIY) instructions that can be accessed on demand by clicking on DIY sections, for example providing the technical steps needed to install and then run code for YOLO tracking on (user-defined) data. This notebook is therefore not simply an add-on supplemental text, but it is a key part of our contribution as it allows for newcomers to social signal processing to start implementing this pipeline step by step.
The Quarto notebook is structured to have the following components, next to traditional text sections:
🚩 pipeline code that overview complete code associated with the pipeline, or shorter code chunks to, for example, explain a particular function. These code blocks are generally not, for the sake of efficiency, run inside the current notebook. Denoted with the symbol 🚩.
🏴 other code chunks that we use for this quarto notebook to create sanity checks or animations related to measures or summarizing datasets. These code blocks are generally run inside the notebook. Denoted with the symbol 🏴.
📊 example output that shows example output of the pipeline. Denoted with the symbol 📊.
🛠️ DIY blocks that provide shorter, step-by-step links to the full code for executing a particular step in the pipeline on your own data. Denoted with the symbol 🛠️.
⬇️ dataset download provide information about how to download the dataset overviewed for our report. Denoted with the symbol ⬇️.
What the Quarto notebook is not intended to do: This notebook is not itself the pipeline (i.e., a collection of code that runs the complete pipeline). It is rather a linear step-by-step manual and explainer of how to understand and implement the pipeline steps. For actually applying this code, the user can further dig into the DIY blocks, assembling the pieces that they wish to use, and adjusting as needed.
1.0.1 What skills do I need?
In principle if you know how to run a python or R script you should be able to reproduce the pipeline on your own data. However, being able to run something should rather be the start of learning about the things that are covered in this pipeline. To fully understand the pipeline some programming skills in python and R are helpful, especially if you want to adjust things flexibly to your own data. Then some basic understanding of common time series operations, like differentiation, and smoothing, and general kinematic variables is valuable background (e.g., check the first few chapters of Winter (2009); or even just start with Kahn Acadamy kinematics unit). Solid mathematical background is not required, but some basic understanding of linear algebra and calculus is further helpful. For non-linear analysis some understanding of the underpinnings of dynamical systems is helpful (which may require some basic understanding of systems of differential equations to fully appreciate the significance of).
1.0.1.1 Notes on tested system specifications
Our pipeline has been tested on what amounts to modern (at 2025) PC gaming desktops with GPU support. However, in principle all code can be run without a GPU, but will take considerably more time for pose tracking at step 1 (by an estimated factor of 5 to 10 times). Step 1 to 4 was performed on a PC equipped with a 12th Gen Intel(R) Core(TM) i9-12900KF, NVIDIA GeForce RTX 4090 GPU, and 64 GB RAM running Windows 10 Home. Masking at step 5 was performed on a Dell XPS 17 9710 system equipped with an Intel Core i9-11980HK processor, NVIDIA GeForce RTX 3060 Laptop GPU, and 64 GB RAM running Windows 11 Home (Build 26100).
2 Datasets for our cases
Below is a description of the two datasets that were used as test cases for the pipeline. The datasets are available in masked form to reduce identifiability of the individuals in the dataset. Following the download guide for further information on how to access the datasets.
Download Guide ⬇️
The masked data can be downloaded from the repository of Jagellonian University (FORTHCOMING AFTER REVIEW). The unmasked data can be requested from the authors.
Please note that the masked video data is not the input of the pipeline. YOLO tracking would not work well on masked videos as detailed information about the body is lost, and only represented as the motion tracking data that comes with the videoes. This motion tracking data is present in the repository and was used as input for the pipeline. The masked data can be used to check for qualitative changes in the behavior of the inviduals, and can be used to create animations such as in step 1.3.
Example of masked videodata
2.0.1 Case 1: Dataset siblings
Ten sibling pairs (total = 20 pairs) were observed from Polish urban culture and Yurakaré indigenous culture each. Observations were made during a semi-structured play situation: in the natural environment of the Yurakaré community and laboratory conditions in Poland. Siblings were asked to build a tower together using 54 wooden blocks while sitting facing each other. Each pair had 4 minutes to complete the task. This setup allowed for a systematic comparison of coordinated and mutually adjusted movements in siblings, using our pipeline to examine the smoothness of interaction while “doing things together” in these two cultural groups.
Plotting code for the Siblings dataset 🏴
Code
import pandas as pdimport matplotlib.pyplot as pltimport numpy as npimport os# Load the datadf = pd.read_csv('./meta/project_siblings_metadata_ages_gender.csv')# Data preprocessingdf['AgeDifference'] =abs(df['P1agemonths'] - df['P2agemonths'])df['AverageAge'] = (df['P1agemonths'] + df['P2agemonths']) /2df['GenderCombo'] = df['GenderP1'] +'-'+ df['GenderP2']# Create a 2x2 subplot layoutfig, axs = plt.subplots(2, 2, figsize=(10, 8), gridspec_kw={'width_ratios': [1.5, 1], 'height_ratios': [1.5, 1]})fig.suptitle('Sibling Dataset Overview', fontsize=16, fontfamily='serif')# Define colors for gender combinationscolors = {'M-M': '#4472C4', # Blue'M-F': '#7030A0', # Purple'F-M': '#548235', # Green'F-F': '#C55A11'# Rust/Orange}# 1. Scatter plot - P1 age vs P2 agefor gender_combo, group in df.groupby('GenderCombo'): axs[0, 0].scatter( group['P1agemonths'], group['P2agemonths'], color=colors.get(gender_combo, 'gray'), alpha=0.7, label=gender_combo, s=50 )# Add reference linemin_age =min(df['P1agemonths'].min(), df['P2agemonths'].min())max_age =max(df['P1agemonths'].max(), df['P2agemonths'].max())axs[0, 0].plot([min_age, max_age], [min_age, max_age], 'k--', alpha=0.3)axs[0, 0].set_xlabel('P1 Age (months)', fontfamily='serif')axs[0, 0].set_ylabel('P2 Age (months)', fontfamily='serif')axs[0, 0].set_title('Participant Ages (P1 vs P2)', fontfamily='serif')axs[0, 0].grid(True, alpha=0.3)axs[0, 0].legend()# 2. Bar chart - Age differencesbar_positions = np.arange(len(df))bar_width =0.8axs[0, 1].bar( bar_positions, df['AgeDifference'], width=bar_width, color=[colors.get(combo, 'gray') for combo in df['GenderCombo']])axs[0, 1].set_xticks(bar_positions)axs[0, 1].set_xticklabels(df['Code'], rotation=90)axs[0, 1].set_xlabel('Sibling Pair', fontfamily='serif')axs[0, 1].set_ylabel('Age Difference (months', fontfamily='serif')axs[0, 1].set_title('Age Differences by Sibling Pair', fontfamily='serif')axs[0, 1].grid(True, alpha=0.3, axis='y')# 3. Pie chart - Gender distributiongender_counts = df['GenderCombo'].value_counts()axs[1, 0].pie( gender_counts, labels=gender_counts.index, autopct='%1.1f%%', colors=[colors.get(combo, 'gray') for combo in gender_counts.index], wedgeprops={'edgecolor': 'w', 'linewidth': 1})axs[1, 0].set_title('Gender Distribution', fontfamily='serif')# 4. Summary tablesummary_data = [ ["Total Sibling Pairs", f"{len(df)}"], ["Average P1 Age", f"{df['P1agemonths'].mean()} months"], ["Average P2 Age", f"{df['P2agemonths'].mean()} months"], ["Average Age Difference", f"{df['AgeDifference'].mean()/30:.1f} months"], ["Number of Males", f"{(df['GenderP1'] =='M').sum() + (df['GenderP2'] =='M').sum()}"], ["Number of Females", f"{(df['GenderP1'] =='F').sum() + (df['GenderP2'] =='F').sum()}"]]# Turn off axis for tableaxs[1, 1].axis('off')table = axs[1, 1].table( cellText=[row for row in summary_data], colLabels=["Measure", "Value"], loc='center', cellLoc='left')table.auto_set_font_size(False)table.set_fontsize(10)table.scale(1, 1.5)axs[1, 1].set_title('Dataset Summary', fontfamily='serif')plt.tight_layout()plt.subplots_adjust(top=0.9)# Save the figure to a fileoutput_path ="images/sibling_analysis.png"os.makedirs(os.path.dirname(output_path), exist_ok=True)plt.savefig(output_path, dpi=300, bbox_inches='tight')plt.close()
2.0.2 Case 2: Dataset mother-infant
Thirteen infant-caregiver pairs visited the child laboratory monthly for a total of seven visits. The infants’ ages at the first visit ranged from 7 to 9 months. Parents were instructed to “play as usual,” with a set of different toys to encourage spontaneous interactions. Each visit lasted approximately 30 minutes and was divided into four stages. For our analyses, we focus on recordings from the stage where the infant was seated in a feeding chair, facing the caregiver sitting directly across. This controlled setup minimizes movement around the room, facilitating video motion tracking analysis. Additionally, our analyses are restricted to top-view recordings, ensuring that both the caregiver and the infant are captured within a single frame.
Plotting code for the Caregiver-Infant dataset 🏴
Code
import matplotlibmatplotlib.use('agg')import matplotlib.pyplot as pltplt.ioff()plt.clf() # Clear any existing figuresimport pandas as pdimport numpy as npimport os# Load the datadf = pd.read_csv('./meta/project_point_metadata_ages.csv')# Convert gender to categorical labeldf['gender_label'] = df['gender'].map({0: 'Female', 1: 'Male'})# Create a 2x2 subplot layoutfig, axs = plt.subplots(2, 2, figsize=(12, 10), gridspec_kw={'width_ratios': [1.5, 1], 'height_ratios': [1.5, 1]})fig.suptitle('Caregiver-Infant Dataset Overview', fontsize=16, fontfamily='serif')# Define colors for gendercolors = {'Female': '#C55A11', # Orange for females'Male': '#4472C4'# Blue for males}# 1. Line plot - Age progression across time pointsfor idx, row in df.iterrows(): gender = row['gender_label']# Create arrays for days and time points, handling NaN values days = [] timepoints = []for i inrange(1, 8): # T1 to T7 day_col =f'age_T{i}_days'if day_col in df.columns and pd.notna(row[day_col]) and row[day_col] >0: days.append(row[day_col]) timepoints.append(i)# Plot the line for this infant axs[0, 0].plot( timepoints, days, marker='o', color=colors[gender], alpha=0.5, linewidth=1.5 )# Customize the plotaxs[0, 0].set_xlabel('Time Point', fontfamily='serif')axs[0, 0].set_ylabel('Age (days)', fontfamily='serif')axs[0, 0].set_title('Infant Age Progression', fontfamily='serif')axs[0, 0].set_xticks(range(1, 8))axs[0, 0].set_xticklabels([f'T{i}'for i inrange(1, 8)])axs[0, 0].grid(True, alpha=0.3)# Add legend patchesfrom matplotlib.patches import Patchlegend_elements = [ Patch(facecolor=colors['Female'], edgecolor='w', label='Female'), Patch(facecolor=colors['Male'], edgecolor='w', label='Male')]axs[0, 0].legend(handles=legend_elements, loc='upper left')# 2. Box plot - Age distribution at each time pointtime_point_data = []labels = []for i inrange(1, 8): # T1 to T7 col =f'age_T{i}_days'if col in df.columns: valid_data = df[col].dropna()iflen(valid_data) >0: time_point_data.append(valid_data) labels.append(f'T{i}')# Create violin plotsif time_point_data: violin_parts = axs[0, 1].violinplot( time_point_data, showmeans=True, showmedians=True )# Color the violin plotsfor i, pc inenumerate(violin_parts['bodies']): pc.set_facecolor('#7030A0') # Purple pc.set_edgecolor('black') pc.set_alpha(0.7)# Set labelsaxs[0, 1].set_xticks(range(1, len(labels) +1))axs[0, 1].set_xticklabels(labels)axs[0, 1].set_xlabel('Time Point', fontfamily='serif')axs[0, 1].set_ylabel('Age (days)', fontfamily='serif')axs[0, 1].set_title('Age Distribution by Time Point', fontfamily='serif')axs[0, 1].grid(True, alpha=0.3, axis='y')# 3. Pie chart - Gender distributiongender_counts = df['gender_label'].value_counts()axs[1, 0].pie( gender_counts, labels=gender_counts.index, autopct='%1.1f%%', colors=[colors[g] for g in gender_counts.index], wedgeprops={'edgecolor': 'w', 'linewidth': 1})axs[1, 0].set_title('Gender Distribution', fontfamily='serif')# 4. Summary table# Calculate averages for each time point, handling NaN valuesaverages_days = []for i inrange(1, 8): col =f'age_T{i}_days'if col in df.columns: avg = df[col].mean()ifnot pd.isna(avg): avg_months = avg /30.44# Convert to months averages_days.append([f"T{i} Average Age", f"{avg:.1f} days ({avg_months:.1f} months)"])# Count participants at each time pointparticipants = []for i inrange(1, 8): col =f'age_T{i}_days'if col in df.columns: count = df[col].notna().sum() participants.append([f"Participants at T{i}", f"{count}"])# Overall statisticssummary_data = [ ["Total Infants", f"{len(df)}"], ["Females", f"{(df['gender'] ==0).sum()}"], ["Males", f"{(df['gender'] ==1).sum()}"]]# Add age ranges if columns existif'age_T1_days'in df.columns: summary_data.append(["Age Range T1", f"{df['age_T1_days'].min():.0f}-{df['age_T1_days'].max():.0f} days"])last_tp =7while last_tp >0: col =f'age_T{last_tp}_days'if col in df.columns and df[col].notna().sum() >0: summary_data.append([f"Age Range T{last_tp}", f"{df[col].min():.0f}-{df[col].max():.0f} days"])break last_tp -=1# Age mean if'age_T1_days'in df.columns: summary_data.append(["Mean Age T1", f"{df['age_T1_days'].mean():.1f} days ({df['age_T1_days'].mean()/30.44:.1f} months)"])if last_tp >0: col =f'age_T{last_tp}_days'if col in df.columns: summary_data.append([f"Mean Age T{last_tp}", f"{df[col].mean():.1f} days ({df[col].mean()/30.44:.1f} months)"])# Create the table with the most important summary statisticsaxs[1, 1].axis('off')table = axs[1, 1].table( cellText=summary_data + participants, loc='center', cellLoc='left')table.auto_set_font_size(False)table.set_fontsize(10)table.scale(1, 1.5)axs[1, 1].set_title('Dataset Summary', fontfamily='serif')plt.tight_layout()plt.subplots_adjust(top=0.9)# Save the figure to a fileoutput_path ="images/mother_infant_analysis.png"os.makedirs(os.path.dirname(output_path), exist_ok=True)plt.savefig(output_path, dpi=300, bbox_inches='tight')plt.close(fig) # Explicitly close the figure object
Note that in our github repository we use short sample data from this dataset from an caregiver-infant pair whom we are allowed to share the video data for the current purposes.
2.1 DIY: Setting up the repository on your local machine 🛠️
2.1.1 Step 0: Github repo
You should first clone the repository InterPerDynPipeline and move to the root directory. The step 1 code for tracking is in the ./code_STEP1_posetrackingprocessing/ folder. In this folder you will find the Python script yolo_tracking_processing.py. To check whether you have the correct script you can compare against the code chunk provided below.
First install git if you do not have it installed yet. You can find instructions on how to install git here.
Then you can clone the repository using the following commands in your (anaconda) terminal:
3 Step 1: Motion tracking, signal processing, and wrangling
The motion tracking and signal processing pipeline is divided into three steps: 1. Tracking: This step involves tracking the movements of individuals in the videos using YOLOv8. The output is a video with annotated keypoints and a CSV file containing the coordinates of these keypoints. 2. Signal Processing: This step involves processing the keypoint data to extract relevant features for analysis. The output are processed timeseries files, and a flat dataset containing our smoothness measures that are directly ready for statistical analysis. 3. Animations: This step involves animating the processed data together with the original video data to ensure that the processed data are of sufficient quality to reflect behavioral changes.
3.1 Step 1.1: Tracking two persons in videos for top-view (Python)
We have tried several pose tracking solutions, such as OpenPose (model 25B; Cao et al. (2021)), Mediapipe (default highest complexity blazepose model; Lugaresi et al. (2019)). We found that these models are not well equipped to track persons from top view, most likely because these models are not trained on ground-truth poses of persons from top-view camera angles. However, we found the heaviest model of YOLOv8 (“yolov8x-pose-p6.pt”) does perform very well, especially as compared to the other models we tested. Thus, for this pipeline we recommend using YOLOv8 model (Jocher, Chaurasia, and Qiu 2023) for top-view tracking. Always check tracking quality of the model that you use on your own dataset. YOLOv8 may not be the best fit for all datasets.
There has further been work on comparing YOLOv8 for human pose estimation against gold-standard motion tracking systems. For example, Giulietti et al. (2025) showed that the model we use here yolov8x-pose-p6.pt (69.4M parameters) has 18.2 mm mean-per-joint-position error (MPJPE) when incorporated in a calibrated 4-camera system compared to OptiTrack marker-based optical motion capture system (12 cameras). The lowest errors were for the upper body (nose, shoulders, elbows, wrists) which are the keypoints we use in our pipeline. Araujo et al. (2025) performed a similar comparison with Opti-track to assess YOLOv8 (exact model was unspecified), showing that orientation of the torso and head provided a > 70% accuracy at a distance of 300cm-500cm from the camera as compared to the gold-standard. Do note that these satisfactory performance levels are achieved from different conditions than ours as we have a top-view setup and these tests were performed on multi-camera eye-level side and frontal camera setups. There is thus further room to compare performance of YOLOv8 against gold-standard motion tracking systems for these top-view setups.
DIY Step 1.1 Motion Tracking Set-Up and Execution 🛠️
3.1.1 Step 1: Install requirements
For each script we have provided a requirements.txt file. You can install the requirements using pip, by first navigating to the folder where the script is located and then running the following command in your terminal:
In the current GitHub repo we have a light-weight model yolov8n-pose.pt (~6.5MB) for immediate testing of the code. However, this model is not as accurate as the heaviest model.
We should download the heaviest model (yolov8x-pose-p6.pt) and save it to the ./code_STEP1_posetrackingprocessing/model/ directory:
https://github.com/ultralytics/assets/releases/download/v8.1.0/yolov8x-pose-p6.pt
Note that if you’re running the heavyweight model, you will need GPU support for this to run in a reasonable time (we processed our data on a NVIDIA GeForce RTX 4090 GPU). The lightweight model can run on CPU.
3.1.3 Step 3: Run the python script
Assuming you have videos in your data folder (./data_raw/) and you have a .pt model in the model folder, you can run the script using the following command:
The code below is the script yolo_tracking_processing.py that you can run to track the videos. The output will be a video with annotated keypoints and a CSV file containing the coordinates of these keypoints. The script processes each video frame-by-frame, detecting people and their pose keypoints (17 points as listed in the script; see the GetKeypoint class). We filter out duplicate detections or skeletons with excessive missing data. Specifically, for each processed video, the script generates two output files: an annotated video showing the original footage with skeleton overlays (green points for accepted keypoints, red for filtered ones, and blue lines connecting the points), and a CSV file containing the raw coordinate data (frame number, person ID, keypoint ID, x-coordinate, y-coordinate). The output filenames include parameters like “c150” (150px proximity threshold) and “miss95” (95% missing data tolerance), which control how the system handles potential duplicate detections and incomplete skeletons. The user could adjust the parameter close_threshold and miss_tolerance to adjust the detection dynamics. We output the annotated video and the CSV file in the ./datatracked_afterSTEP1/ folder.
Pipeline code 1.1 YOLO tracking 🚩
Code
from ultralytics import YOLOfrom pydantic import BaseModelimport cv2import csvimport numpy as npimport globimport osimport torch # for gpu supportfrom itertools import combinationsimport systorch.cuda.set_device(0)# Load the modelmodelfolder ='./model/'modellocation = glob.glob(modelfolder+"*.pt")[0]modelfile = os.path.basename(modellocation)print(f"We are loading in the following YOLO model: {modelfile}")model = YOLO(modellocation)# main variablesvideo_folder ="../data_raw/"# avi mp4 or other video formatsvideo_files = glob.glob(video_folder +"*.mp4") + glob.glob(video_folder +"*.avi") + glob.glob(video_folder +"*.mov") + glob.glob(video_folder +"*.mkv")step1resultfolder ="../datatracked_afterSTEP1/"print(video_files)# keypoint namesclass GetKeypoint(BaseModel): NOSE: int=0 LEFT_EYE: int=1 RIGHT_EYE: int=2 LEFT_EAR: int=3 RIGHT_EAR: int=4 LEFT_SHOULDER: int=5 RIGHT_SHOULDER: int=6 LEFT_ELBOW: int=7 RIGHT_ELBOW: int=8 LEFT_WRIST: int=9 RIGHT_WRIST: int=10 LEFT_HIP: int=11 RIGHT_HIP: int=12 LEFT_KNEE: int=13 RIGHT_KNEE: int=14 LEFT_ANKLE: int=15 RIGHT_ANKLE: int=16get_keypoint = GetKeypoint()# Define skeleton connectionsskeleton = [ (get_keypoint.LEFT_SHOULDER, get_keypoint.RIGHT_SHOULDER), (get_keypoint.LEFT_SHOULDER, get_keypoint.LEFT_ELBOW), (get_keypoint.RIGHT_SHOULDER, get_keypoint.RIGHT_ELBOW), (get_keypoint.LEFT_ELBOW, get_keypoint.LEFT_WRIST), (get_keypoint.RIGHT_ELBOW, get_keypoint.RIGHT_WRIST), (get_keypoint.LEFT_SHOULDER, get_keypoint.LEFT_HIP), (get_keypoint.RIGHT_SHOULDER, get_keypoint.RIGHT_HIP), (get_keypoint.LEFT_HIP, get_keypoint.RIGHT_HIP), (get_keypoint.LEFT_HIP, get_keypoint.LEFT_KNEE), (get_keypoint.RIGHT_HIP, get_keypoint.RIGHT_KNEE), (get_keypoint.LEFT_KNEE, get_keypoint.LEFT_ANKLE), (get_keypoint.RIGHT_KNEE, get_keypoint.RIGHT_ANKLE),]def tensor_to_matrix(results_tensor):# this just takes the results output of YOLO and coverts it to a matrix,# making it easier to do quick calculations on the coordinates results_list = results_tensor.tolist() results_matrix = np.matrix(results_list) results_matrix[results_matrix==0] = np.nanreturn results_matrixdef check_for_duplication(results):# this threshold determines how close two skeletons must be in order to be# considered the same person. Arbitrarily chosen for now. close_threshold =150# note that the value is in pixels# missing data tolerance miss_tolerance =0.95# this means we can miss up to 95% of the keypoints drop_indices = []iflen(results[0].keypoints.xy) >1: # only proceed if a skeleton is detected conf_scores = []# get detection confidence for each skeletonfor person in tensor_to_matrix(results[0].keypoints.conf): conf_scores.append(np.mean(person))# this list will stores which comparisons need to be made combos =list(combinations(range(len(results[0].keypoints.xy)), 2))# now loop through these comparisonsfor combo in combos: closeness =abs(np.nanmean(tensor_to_matrix(results[0].keypoints.xy[combo[0]]) - tensor_to_matrix(results[0].keypoints.xy[combo[1]])))# if any of them indicate that two skeletons are very close together,# we keep the one with higher tracking confidence, and remove the otherif closeness < close_threshold: conf_list = [conf_scores[combo[0]], conf_scores[combo[1]]] idx_min = conf_list.index(min(conf_list)) drop_indices.append(combo[idx_min])# additional checks:for person inrange(len(results[0].keypoints.xy)): keypoints_missed = np.isnan(tensor_to_matrix(results[0].keypoints.xy[person])).sum()/2 perc_missed = keypoints_missed/len(tensor_to_matrix(results[0].keypoints.xy[person])) # calculate percentage of missing keypoints# compare % of missing keypoints against the threshold that we set earlierif perc_missed > miss_tolerance: drop_indices.append(person)returnlist(set(drop_indices))for video_path in video_files:# Video path video_path = video_path# only if the output is not there yetif os.path.exists(step1resultfolder+ os.path.basename(video_path).split('.')[0]+"_annotated_layer1_c150_miss95.mp4"):print(f"Output video already exists for {video_path}. Skipping...")continue# vidname without extension vidname = os.path.basename(video_path) vidname = vidname.split('.')[0]# Open the video cap = cv2.VideoCapture(video_path)# Get video properties fps =int(cap.get(cv2.CAP_PROP_FPS)) width =int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height =int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))# Define the output video writer corename = os.path.basename(video_path).split('.')[0] output_path = step1resultfolder+ vidname+"_annotated_layer1_c150_miss95.mp4" fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))# Prepare CSV file csv_path = step1resultfolder+ vidname+'_keypoints_data_layer1.csv' csv_file =open(csv_path, 'w', newline='') csv_writer = csv.writer(csv_file)# Write header header = ['frame', 'person', 'keypoint', 'x', 'y'] csv_writer.writerow(header) frame_count =0while cap.isOpened(): success, frame = cap.read()ifnot success:break# Run YOLOv8 inference on the frame results = model(frame)# write empty rows if no person is detectediflen(results[0].keypoints.xy) ==0: csv_writer.writerow([frame_count, None, None, None, None]) annotated_frame = frame# only do this if a person is detectediflen(results[0].keypoints.xy) >0:# Process the results drop_indices = [] drop_indices = check_for_duplication(results)for person_idx, person_keypoints inenumerate(results[0].keypoints.xy):if person_idx notin drop_indices: colourcode = (0, 255, 0)else: colourcode = (255, 0, 0)for keypoint_idx, keypoint inenumerate(person_keypoints): x, y = keypoint# Write to CSV csv_writer.writerow([frame_count, person_idx, keypoint_idx, x.item(), y.item()])# Draw keypoint on the frame cv2.circle(annotated_frame, (int(x), int(y)), 5, colourcode, -1)# Draw skeletonfor connection in skeleton:if connection[0] <len(person_keypoints) and connection[1] <len(person_keypoints): start_point =tuple(map(int, person_keypoints[connection[0]])) end_point =tuple(map(int, person_keypoints[connection[1]]))ifall(start_point) andall(end_point): # Check if both points are valid cv2.line(annotated_frame, start_point, end_point, (255, 0, 0), 2)# Write the frame to the output video out.write(annotated_frame) frame_count +=1# Release everything cap.release() out.release() cv2.destroyAllWindows() csv_file.close()print(f"Output video saved as {output_path}")print(f"Keypoints data saved as {csv_path}")
Here is an example of the output video with annotated keypoints and skeletons. The green points indicate accepted keypoints, while the red points indicate filtered ones. The blue lines connect the keypoints to form a skeleton.
Figure S3. Example output video with annotated keypoints and skeletons. The green points indicate accepted keypoints. The blue lines connect the keypoints to form a skeleton.
Here is an example of the output csv file containing the coordinates of the keypoints. The CSV file contains the following columns: frame number, person ID, keypoint ID, x-coordinate, and y-coordinate. Each row corresponds to a detected keypoint in a specific frame.
Example output and code example for YOLO tracking: 📊
Code
import pandas as pdimport glob as globimport osfolderstep1output ="./dataoutput_STEP1_1_rawposedata/"# Load the CSV filecsv_file = glob.glob(os.path.join(folderstep1output +"*.csv"))[0]df = pd.read_csv(csv_file)# Display the first few rows of the DataFrameprint(df.head())
3.2 Step 1.2: Raw pose data post-processing for timeseries matrices (Python)
The raw pose data with frame, person, keypoint, and x,y position data that we end up with is not yet in a format that is easy to work with for our purposes. Additionally, sometimes we dont have a person (or multiple persons) detected in the frame. We want to transform these raw pose data to timeseries matrix format, whereby we have time in seconds as one column and different time-varying variables in the other columns. Furthermore, we smooth and interpolate the data if necessary. The below python code executes this second step whereby we store the position data per person, and furthe derive interpersonal distances measures, and kinematic metrics (e.g. speed) and tracking quality indicators (e.g. tracking status that says something about where it concerns interpolated data).
Next to determining time in seconds from the frame rate of the videos, the script starts with assigning left and right person IDs based on the horizontal position in the frame (with left being ID 1 and right being ID 2). It then processes the time data to calculate statistics for tracked people using upper body keypoints (as the lower body keypoints are unreliable from top-view and may be ignored). Specifically, we reduce the dimensionality of all upper body keypoints by determined relative positions and their distances: We calculate a bounding box (determined by the outer edges of the keypoints) and determine the center in x and y coordinates (centroid_x, centroid_y) and their P1-P2 distance of those centroids (distance), and the center of mass (com_x, com_y) for each person and the distnace (distance_com). Further we determine the midpoint of the shoulders as another relevant position (shoulder_midpoint_x, shoulder_midpoint_y) and compute its distance (distance_shoulder_midpoint). Additionally, information about body parts such as arm motions by taking the wrist positions (wrist_left_x, wrist_left_y, wrist_right_x, wrist_right_y), and further deriving total motion per second, called speed (based on a euclidean sum of the changes of position along x and y).
Before calculating our primary measure of interest, which will be the object of our statistical analyses. We need to process the data in two important ways. First, we check for any remaining missing values after processing and confirm time continuity to detect any frame drops or temporal misalignments. The script handles missing data through two complementary techniques: linear interpolation for brief gaps in tracking (up to 5 frames) and we take the last known location for longer gaps (up to 20 frames).
Second, we need to smooth the data. Motion tracking, even when quite accurate, is prone to small fluctuations in position from frame to frame even when the tracked object/person is not moving. In the case of pose tracking of top-down camera angles there is quite some noise related fluctuations in the keypoint estimates. When we calculate something like interpersonal proximity, these fluctuations will cause fluctuations in proximity. Noise especially amplifies to subsequent measures, such as speed, acceleration, and jerk. In the provided pipeline, we use a Savitsky-Golay filter (implemented by SciPy(Virtanen et al. 2020)). The filter slides along the time-series, smoothing the data within a particular window-length by fitting a polynomial of a pre-determined order to the datapoints. In our case, we set a time window of 11 datapoints, and a third degree polynomial. The plot below shows the effect of smoothing the speed data in this way. The script also calculates the speed of the center of mass and wrist positions, and here we also smooth the values using the Savitzky-Golay filter to reduce noise that is amplified during differientation (see Winter (2009)).
We recommend users to check the effect of smoothing on their own data, and adjust the window length and polynomial order to see whether the movement granularity that one is interested in is adequately represented in the time series (the animation code that we provided will help with checking whether one is smoothing too little or too much).
Example code for checking behavior of smoothing of time series to reduce noise-related jitter 🏴
Code
import pandas as pdimport numpy as npimport glob as globimport osimport scipy.signalimport matplotlib.pyplot as pltdef normalize_array(arr):"""Normalize an array to the range [0, 1]"""return (arr - np.min(arr)) / (np.max(arr) - np.min(arr))folderstep12output ="./dataoutput_STEP1_2_timeseries/"# Load the CSV filecsv_file = glob.glob(os.path.join(folderstep12output +"*.csv"))[0]df = pd.read_csv(csv_file)# get raw speed valuesraw_speed = df['wrist_left_p1_speed'].values# smooth the speed timeseriesspeed = scipy.signal.savgol_filter(raw_speed, window_length=11, polyorder=3)# comparison smoothingextra_speed = scipy.signal.savgol_filter(raw_speed, window_length=21, polyorder=3)plt.plot(normalize_array(raw_speed[0:150]))plt.plot(normalize_array(speed[0:150]))plt.plot(normalize_array(extra_speed[0:150]))plt.title("Effect of smoothing on speed data using Savitsky-Golay filter")plt.xlabel("Frames")plt.legend(labels=["raw speed","smoothed speed (window 11)", "extra smoothed speed (window 21)"])#plt.show()# save the plotplt.savefig("images/smoothing_speed_effect.png")plt.close()
Figure S4. Effect of smoothing on speed data using Savitsky-Golay filter
The more sophisticated measure that we have created, which is the object of our later analyses, is the proximity calculation measure (see function calculate_approach() in the step2 script). This function establishes initial reference position for both participants and then projects their subsequent movements onto the connecting line between them. This allows for quantifying approach and retreat behaviors for each person seperately (which a distance measure does not allow for) while allowing for relative angle changes between them - positive values indicate movement toward the other person (relative from start), while negative values indicate withdrawal (relative from start).
Below is an animation to exemplify what we are measuring. We are here capturing only movements that are changing the distance between them. When someone moves in a circle around the other person we want to make sure we do not register that as approach or retreat (which we would do if we just capture horizontal position for example). However we also want capture each person their own approach/avoidance value (which would not be possible when using a simple distance measure). By always measuring the signed length of the projected line from the initial position (based on the current connected line), the method tracks cumulative approach/avoidance for each person over time.
Code for creating animation to exemplify behavior of proximity measure 🏴
Code
import numpy as npimport matplotlib.pyplot as pltfrom matplotlib.animation import FuncAnimationimport matplotlib.patches as patchesimport matplotlib.lines as mlinesimport osimport matplotlib# Force matplotlib to use Agg backend to avoid display issuesmatplotlib.use('Agg')# Create output directory if it doesn't existos.makedirs("images/proximity_visualization", exist_ok=True)# Simulation parametersnum_frames =100p1_color ='#3498db'# Bluep2_color ='#e74c3c'# Redvector_color ='#2ecc71'# Green# Create initial positions for two people - now on a horizontal line# Person 1 on the left, Person 2 on the right (same y-coordinate)p1_initial = np.array([3, 5])p2_initial = np.array([7, 5])# Generate movement paths with MORE EXTREME approach and avoidance# Person 1 will make a dramatic approach followed by retreat# Person 2 will make a clear retreat followed by approacht = np.linspace(0, 2*np.pi, num_frames)# Create more dramatic movement path for Person 1# Increase the amplitude of movement to make it more extremep1_path_x = p1_initial[0] +2.5* np.sin(t/2) # Larger horizontal movement (was 1.5)p1_path_y = p1_initial[1] +2.5* np.sin(t) # Larger vertical movement (was 2.0)# Create more dramatic movement path for Person 2# Make the retreat phase more obviousp2_path_x = p2_initial[0] -1.2* np.sin(t) # More horizontal movement (was 0.5)p2_path_y = p2_initial[1] -2.0* np.sin(t*1.5) # More vertical movement (was 1.0)# Store positions for each framep1_positions = np.column_stack((p1_path_x, p1_path_y))p2_positions = np.column_stack((p2_path_x, p2_path_y))# Arrays to store approach valuesp1_approach_values = np.zeros(num_frames)p2_approach_values = np.zeros(num_frames)# Calculate proximity approach valuesdef calculate_approach(positions1, positions2):"""Calculate approach values similar to the script's approach"""# Use first frame as reference reference_p1_pos = positions1[0].copy() reference_p2_pos = positions2[0].copy() p1_approach = np.zeros(len(positions1)) p2_approach = np.zeros(len(positions2))for i inrange(len(positions1)):# Current positions p1_pos = positions1[i] p2_pos = positions2[i]# Current connecting vector and direction current_connect = p2_pos - p1_pos current_distance = np.linalg.norm(current_connect)if current_distance >0: current_direction = current_connect / current_distance# Vectors from reference positions p1_vector = p1_pos - reference_p1_pos p2_vector = p2_pos - reference_p2_pos# Project onto connecting line p1_approach[i] = np.dot(p1_vector, current_direction)# For P2, a positive value should mean movement toward P1# So we use the negative connecting direction (from P2 to P1)# And a positive dot product means approaching p2_direction =-current_direction # Direction from P2 to P1 p2_approach[i] = np.dot(p2_vector, p2_direction)return p1_approach, p2_approach# Calculate approach valuesp1_approach_values, p2_approach_values = calculate_approach(p1_positions, p2_positions)# Create the main visualizationdef create_detailed_animation():# Set up the figure explicitly with DPI fig, ax = plt.subplots(figsize=(12, 10), dpi=100) # Larger figure for better visibilitydef init(): ax.clear() ax.set_xlim(0, 10) ax.set_ylim(0, 10) ax.set_aspect('equal')return []def animate(i): ax.clear()# Set up the axes ax.set_xlim(0, 10) ax.set_ylim(0, 10) ax.set_aspect('equal') ax.set_title('Proximity Approach Visualization (Extreme Movements)', fontsize=16) ax.set_xlabel('X Position', fontsize=12) ax.set_ylabel('Y Position', fontsize=12)# Get current positions p1_pos = p1_positions[i] p2_pos = p2_positions[i]# Draw the initial connecting vector (reference) ax.plot([p1_positions[0][0], p2_positions[0][0]], [p1_positions[0][1], p2_positions[0][1]], color=vector_color, linewidth=1, linestyle='--', alpha=0.5, zorder=0)# Draw the current connection vector ax.plot([p1_pos[0], p2_pos[0]], [p1_pos[1], p2_pos[1]], color=vector_color, linewidth=2.5, linestyle='-', zorder=1)# Draw person markers ax.scatter(p1_pos[0], p1_pos[1], s=180, color=p1_color, edgecolor='black', linewidth=1.5, zorder=3) ax.scatter(p2_pos[0], p2_pos[1], s=180, color=p2_color, edgecolor='black', linewidth=1.5, zorder=3)# Draw initial positions ax.scatter(p1_positions[0][0], p1_positions[0][1], s=90, color=p1_color, alpha=0.5, edgecolor='black', linewidth=1, zorder=2) ax.scatter(p2_positions[0][0], p2_positions[0][1], s=90, color=p2_color, alpha=0.5, edgecolor='black', linewidth=1, zorder=2)# Draw trails (last 15 positions) start_idx =max(0, i-15) ax.plot(p1_positions[start_idx:i+1, 0], p1_positions[start_idx:i+1, 1], '-', color=p1_color, alpha=0.5, linewidth=2) ax.plot(p2_positions[start_idx:i+1, 0], p2_positions[start_idx:i+1, 1], '-', color=p2_color, alpha=0.5, linewidth=2)# Visualize the approach values with text - make them larger and more visible approach_text = (f"P1 approach: {p1_approach_values[i]:.2f}\n"f"P2 approach: {p2_approach_values[i]:.2f}") ax.text(0.05, 0.95, approach_text, transform=ax.transAxes, verticalalignment='top', fontsize=14, fontweight='bold', bbox=dict(boxstyle='round', facecolor='white', alpha=0.9))# Add projection visualizationif i >0:# Get current connecting direction connect_vector = p2_pos - p1_pos connect_distance = np.linalg.norm(connect_vector) connect_direction = connect_vector / connect_distance# Person 1 projection p1_vector = p1_pos - p1_positions[0] p1_projection = np.dot(p1_vector, connect_direction) * connect_direction# Draw projection line for P1 p1_proj_point = p1_positions[0] + p1_projection# Person 2 projection - use direction from P2 to P1 p2_vector = p2_pos - p2_positions[0] p2_direction =-connect_direction # Direction from P2 to P1 p2_projection = np.dot(p2_vector, p2_direction) * p2_direction# Draw projection line for P2 p2_proj_point = p2_positions[0] + p2_projection# Draw projection lines with arrows to show direction# For P1 p1_approach = p1_approach_values[i]ifabs(p1_approach) >0.1: # Only draw if there's a significant approach value# Draw line with increased width ax.plot([p1_positions[0][0], p1_proj_point[0]], [p1_positions[0][1], p1_proj_point[1]], '--', color=p1_color, linewidth=2.5)# Add arrow to show direction - make arrow larger arrow_length =min(abs(p1_approach) *0.2, 0.6) # Scale arrow size arrow_width =0.15if p1_approach >0: # Approaching# Arrow pointing from initial position toward projection point ax.arrow(p1_positions[0][0], p1_positions[0][1], arrow_length * connect_direction[0], arrow_length * connect_direction[1], head_width=arrow_width, head_length=arrow_width*1.5, fc=p1_color, ec='black', linewidth=1, zorder=4)else: # Retreating# Arrow pointing from initial position away from projection point ax.arrow(p1_positions[0][0], p1_positions[0][1], -arrow_length * connect_direction[0], -arrow_length * connect_direction[1], head_width=arrow_width, head_length=arrow_width*1.5, fc=p1_color, ec='black', linewidth=1, zorder=4)# Add value label near the projection line mid_x = (p1_positions[0][0] + p1_proj_point[0]) /2 mid_y = (p1_positions[0][1] + p1_proj_point[1]) /2 ax.text(mid_x, mid_y, f"{p1_approach:.2f}", color=p1_color, fontsize=10, fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, boxstyle='round,pad=0.2'))# For P2 p2_approach = p2_approach_values[i]ifabs(p2_approach) >0.1: # Only draw if there's a significant approach value# Draw line with increased width ax.plot([p2_positions[0][0], p2_proj_point[0]], [p2_positions[0][1], p2_proj_point[1]], '--', color=p2_color, linewidth=2.5)# Add arrow to show direction - make arrow larger arrow_length =min(abs(p2_approach) *0.2, 0.6) # Scale arrow size arrow_width =0.15if p2_approach >0: # Approaching# Arrow pointing from initial position toward P1 ax.arrow(p2_positions[0][0], p2_positions[0][1], arrow_length * p2_direction[0], arrow_length * p2_direction[1], head_width=arrow_width, head_length=arrow_width*1.5, fc=p2_color, ec='black', linewidth=1, zorder=4)else: # Retreating# Arrow pointing from initial position away from P1 ax.arrow(p2_positions[0][0], p2_positions[0][1], -arrow_length * p2_direction[0], -arrow_length * p2_direction[1], head_width=arrow_width, head_length=arrow_width*1.5, fc=p2_color, ec='black', linewidth=1, zorder=4)# Add value label near the projection line mid_x = (p2_positions[0][0] + p2_proj_point[0]) /2 mid_y = (p2_positions[0][1] + p2_proj_point[1]) /2 ax.text(mid_x, mid_y, f"{p2_approach:.2f}", color=p2_color, fontsize=10, fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, boxstyle='round,pad=0.2'))# Add small markers at projection points ax.scatter(p1_proj_point[0], p1_proj_point[1], color=p1_color, s=70, marker='x') ax.scatter(p2_proj_point[0], p2_proj_point[1], color=p2_color, s=70, marker='x')# Draw horizontal dotted line at initial y-coordinate to emphasize horizontal start ax.axhline(y=p1_initial[1], color='gray', linestyle=':', alpha=0.5)# Add legend legend_elements = [ patches.Patch(facecolor=p1_color, edgecolor='black', label='Person 1'), patches.Patch(facecolor=p2_color, edgecolor='black', label='Person 2'), patches.Patch(facecolor=vector_color, edgecolor='black', label='Connection Vector'), mlines.Line2D([], [], color='gray', linestyle=':', label='Initial Horizontal Line'), mlines.Line2D([], [], color=p1_color, linestyle='--', linewidth=2, label='Projection Length = Approach Value') ] ax.legend(handles=legend_elements, loc='upper right', fontsize=10)# Add a frame counter ax.text(0.95, 0.05, f"Frame: {i}/{num_frames-1}", transform=ax.transAxes, horizontalalignment='right', verticalalignment='bottom', fontsize=10, bbox=dict(boxstyle='round', facecolor='white', alpha=0.5))# Add description text description = ("Proximity Calculation Visualization\n""- Large dots: Current positions\n""- Small dots: Initial positions (horizontal alignment)\n""- Green line: Current connecting vector\n""- Dashed lines: Projection of movement (length = approach value)\n""- Arrows: Direction of approach/retreat\n""- Positive values: Movement toward other person\n""- Negative values: Movement away from other person\n""- X markers: Projected positions" ) ax.text(0.05, 0.05, description, transform=ax.transAxes, verticalalignment='bottom', fontsize=9, bbox=dict(boxstyle='round', facecolor='white', alpha=0.7))return []# Create the animation with explicit FPS and DPI ani = FuncAnimation(fig, animate, frames=num_frames, init_func=init, blit=True, interval=100)# Save the animation as a GIF with explicit DPI ani.save('images/proximity_visualization/proximity_calculation_extreme.gif', writer='pillow', fps=10, dpi=100) plt.close(fig)# Run the animationtry:print("Creating animation...") create_detailed_animation()print("GIF created successfully in the 'images/proximity_visualization' folder.")exceptExceptionas e:print(f"Error creating animation: {e}")# Alternative approach if the above fails - create static framesprint("Trying alternative approach with static frames...")try:# Create a static image sequence instead os.makedirs("images/proximity_visualization/frames", exist_ok=True)# Create 10 static frames instead of full animation sample_frames = np.linspace(0, num_frames-1, 10, dtype=int)for idx, i inenumerate(sample_frames): fig, ax = plt.subplots(figsize=(10, 8), dpi=100)# Set up the axes ax.set_xlim(0, 10) ax.set_ylim(0, 10) ax.set_title(f'Proximity Calculation (Frame {i})', fontsize=14)# Get current positions p1_pos = p1_positions[i] p2_pos = p2_positions[i]# Draw the connection vector ax.plot([p1_pos[0], p2_pos[0]], [p1_pos[1], p2_pos[1]], color=vector_color, linewidth=2)# Draw person markers ax.scatter(p1_pos[0], p1_pos[1], s=150, color=p1_color, label='Person 1') ax.scatter(p2_pos[0], p2_pos[1], s=150, color=p2_color, label='Person 2')# Draw initial positions ax.scatter(p1_positions[0][0], p1_positions[0][1], s=50, color=p1_color, alpha=0.5) ax.scatter(p2_positions[0][0], p2_positions[0][1], s=50, color=p2_color, alpha=0.5)# Draw horizontal dotted line at initial y-coordinate ax.axhline(y=p1_initial[1], color='gray', linestyle=':', alpha=0.5)# Show approach values ax.text(0.5, 0.95, f"Person 1 approach: {p1_approach_values[i]:.2f}", transform=ax.transAxes, ha='center', va='top', bbox=dict(boxstyle='round', facecolor=p1_color, alpha=0.3)) ax.text(0.5, 0.9, f"Person 2 approach: {p2_approach_values[i]:.2f}", transform=ax.transAxes, ha='center', va='top', bbox=dict(boxstyle='round', facecolor=p2_color, alpha=0.3)) ax.legend()# Save the frame plt.tight_layout() plt.savefig(f'images/proximity_visualization/frames/frame_{idx:02d}.png') plt.close(fig)print("Static frames created successfully in 'images/proximity_visualization/frames' folder.")exceptExceptionas e:print(f"Alternative approach also failed: {e}")
Creating animation...
GIF created successfully in the 'images/proximity_visualization' folder.
Figure S5. Proximity Calculation Visualization
The output of this script is a timeseries matrix for each video, which contains the time in seconds and the calculated variables. The timeseries matrices are saved in the ./dataoutput_STEP1_2_timeseries/ folder. The code below is the script STEP2_process_timeseries.py that you can run to process the raw pose data.
Pipeline code step 1.2 for processing to timeseries data 🚩
Code
import pandas as pdimport numpy as npimport globimport osimport cv2import mathfrom bisect import bisect_leftfrom scipy.signal import savgol_filter# Path DefinitionsINPUT_LAYER1_PATH ='../dataoutput_STEP1_1_rawposedata/'# Input directory containing tracked keypoint data from STEP1VIDEO_PATH ="../data_raw/"# Raw video files directoryOUTPUT_PATH ='../dataoutput_STEP1_2_timeseries/'# Output directory for processed timeseries data# Variable Explanations:# =====================================================# Positional Variables:# - x, y: 2D coordinates in the image frame# - x_min, x_max, y_min, y_max: Bounding box coordinates for a person# - centroid_x, centroid_y: Center point of a person's bounding box# - com_x, com_y: Center of mass for a person (average of upper body keypoint positions)# - shoulder_midpoint_*: Midpoint between left and right shoulders for each person (p1, p2)# - wrist_left_*, wrist_right_*: Positions of left and right wrists for each person (p1, p2)## Distance Variables:# - distance: Euclidean distance between the centroids of both people# - distance_com: Euclidean distance between centers of mass of both people# - distance_shoulder_midpoint: Distance between shoulder midpoints of both people# - *_speed: Euclidean speed of different body parts (calculated from position differences)# - *_speed_smooth: Smoothed version of speed using Savitzky-Golay filter## Movement Analysis:# - p1_com_approach_pos, p2_com_approach_pos: Position of each person's COM relative to their# initial position, projected onto the connecting line between people. Positive values # indicate movement toward the other person.## Tracking Status:# - both_tracked: Boolean indicating if both people are tracked in the current frame# - single_tracked: Boolean indicating if only one person is tracked in the current frame# =====================================================def frame_to_time(ts, fps):"""Convert frame numbers to time in seconds""" ts["time"] = [row[1]["frame"] / fps for row in ts.iterrows()]return tsdef take_closest(myList, myNumber):""" Assumes myList is sorted. Returns closest value to myNumber. If two numbers are equally close, return the smallest number. """ pos = bisect_left(myList, myNumber)if pos ==0:return myList[0]if pos ==len(myList):return myList[-1] before = myList[pos -1] after = myList[pos]if after - myNumber < myNumber - before:return afterelse:return beforedef assign_left_right(ts):"""Assign person IDs (0 or 1) based on x-position in the frame""" min_x = np.min([val for val in ts['x'] if val !=0]) max_x = np.max([val for val in ts['x'] if val !=0])for index, row in ts.iterrows():if row['x'] ==0: # Skip untracked pointscontinueif take_closest([min_x, max_x], row['x']) == min_x: ts.loc[index, 'person'] =0# Left personelse: ts.loc[index, 'person'] =1# Right personreturn tsdef process_time_data(ts):"""Calculate statistics for tracked people using upper body keypoints""" ts_upper = ts[ts['keypoint'] <11] # Filter to upper body keypoints only# Calculate stats per person and time stats = ts_upper.groupby(['time', 'person']).agg({'x': ['min', 'max', 'mean'],'y': ['min', 'max', 'mean'] }).reset_index() stats.columns = ['time', 'person', 'x_min', 'x_max', 'com_x', 'y_min', 'y_max', 'com_y'] stats['centroid_x'] = (stats['x_min'] + stats['x_max']) /2 stats['centroid_y'] = (stats['y_min'] + stats['y_max']) /2return statsdef process_people_data_vectorized(ts, stats):""" Process keypoint data to calculate relative positions and distances. Preserves time alignment and handles COM/centroid assignment. """# Get all unique times from the original data all_times =sorted(ts['time'].unique()) result = pd.DataFrame(index=all_times) result.index.name ='time'# Process shoulders first (keypoints 5 and 6) shoulders = ts[ts['keypoint'].isin([5, 6])].groupby(['time', 'person']).agg({'x': 'mean','y': 'mean' }).reset_index()# Process wrists (keypoints 7 and 8) wrists = ts[ts['keypoint'].isin([7, 8])].pivot_table( index=['time', 'person'], columns='keypoint', values=['x', 'y'] ).reset_index() wrists.columns = ['time', 'person', 'left_x', 'right_x', 'left_y', 'right_y']# Get person-specific data p0_stats = stats[stats['person'] ==0].set_index('time') p1_stats = stats[stats['person'] ==1].set_index('time')# Process distances where both people exist common_times =sorted(set(p0_stats.index) &set(p1_stats.index))if common_times: # Only calculate if we have common times result.loc[common_times, 'distance'] = np.sqrt( (p0_stats.loc[common_times, 'centroid_x'] - p1_stats.loc[common_times, 'centroid_x'])**2+ (p0_stats.loc[common_times, 'centroid_y'] - p1_stats.loc[common_times, 'centroid_y'])**2 ) result.loc[common_times, 'distance_com'] = np.sqrt( (p0_stats.loc[common_times, 'com_x'] - p1_stats.loc[common_times, 'com_x'])**2+ (p0_stats.loc[common_times, 'com_y'] - p1_stats.loc[common_times, 'com_y'])**2 )# Process shoulders per personfor person, prefix in [(0, 'p1'), (1, 'p2')]: s_person = shoulders[shoulders['person'] == person].set_index('time') result[f'shoulder_midpoint_{prefix}_x'] = s_person['x'] result[f'shoulder_midpoint_{prefix}_y'] = s_person['y']# Calculate shoulder midpoint distance where possible shoulder_cols = ['shoulder_midpoint_p1_x', 'shoulder_midpoint_p2_x','shoulder_midpoint_p1_y', 'shoulder_midpoint_p2_y'] shoulder_mask = result[shoulder_cols].notna().all(axis=1)if shoulder_mask.any(): result.loc[shoulder_mask, 'distance_shoulder_midpoint'] = np.sqrt( (result.loc[shoulder_mask, 'shoulder_midpoint_p1_x'] - result.loc[shoulder_mask, 'shoulder_midpoint_p2_x'])**2+ (result.loc[shoulder_mask, 'shoulder_midpoint_p1_y'] - result.loc[shoulder_mask, 'shoulder_midpoint_p2_y'])**2 )# Process wrists per person and add COM/centroidfor person, prefix in [(0, 'p1'), (1, 'p2')]: w_person = wrists[wrists['person'] == person].set_index('time') result[f'wrist_left_{prefix}_x'] = w_person['left_x'] result[f'wrist_left_{prefix}_y'] = w_person['left_y'] result[f'wrist_right_{prefix}_x'] = w_person['right_x'] result[f'wrist_right_{prefix}_y'] = w_person['right_y']# Use the correct stats object based on person ID person_stats = p0_stats if person ==0else p1_stats# Add com and centroid with the correct stats object result[f'com_{prefix}_x'] = person_stats['com_x'] result[f'com_{prefix}_y'] = person_stats['com_y'] result[f'centroid_{prefix}_x'] = person_stats['centroid_x'] result[f'centroid_{prefix}_y'] = person_stats['centroid_y']# Add tracking quality indicators using original data person_counts = ts.groupby('time')['person'].nunique() result['both_tracked'] = result.index.map(lambda x: person_counts.get(x, 0) ==2) result['single_tracked'] = result.index.map(lambda x: person_counts.get(x, 0) ==1)return resultdef calculate_proximity_approach(timeseries_data):""" Calculate each person's position relative to their initial position, projecting movement onto the current connecting line between people. This handles rotation and changing spatial relationships between people. Positive values indicate movement toward the other person from initial position. The reference positions are only established when both people are detected. """# Create columns for our measurementsif'p1_com_approach_pos'notin timeseries_data.columns: timeseries_data['p1_com_approach_pos'] = np.nanif'p2_com_approach_pos'notin timeseries_data.columns: timeseries_data['p2_com_approach_pos'] = np.nan# Sort data by time sorted_data = timeseries_data.sort_values('time')# Find first valid frame to establish reference positions reference_p1_pos =None reference_p2_pos =None reference_distance =None# First pass: find reference frame where both people are detectedfor idx, row in sorted_data.iterrows():# Skip rows with NaN values or where both_tracked is Falseif (np.isnan(row['com_p1_x']) or np.isnan(row['com_p1_y']) or np.isnan(row['com_p2_x']) or np.isnan(row['com_p2_y']) or ('both_tracked'in row.index and row['both_tracked'] ==False)):continue# Get positions for this frame p1_pos = np.array([row['com_p1_x'], row['com_p1_y']]) p2_pos = np.array([row['com_p2_x'], row['com_p2_y']])# Calculate connecting vector connect_vector = p2_pos - p1_pos distance = np.linalg.norm(connect_vector)if distance >0:# We found a valid reference frame reference_p1_pos = p1_pos.copy() reference_p2_pos = p2_pos.copy() reference_distance = distanceprint(f"Reference frame established at time={row['time']}")print(f" Reference p1_pos: {reference_p1_pos}")print(f" Reference p2_pos: {reference_p2_pos}")print(f" Reference distance: {reference_distance}")breakif reference_p1_pos isNone:print("ERROR: Could not establish a reference frame. No valid frames found with both people detected.")return timeseries_data# Second pass: calculate projected positions for all framesfor idx, row in sorted_data.iterrows():# Skip rows with NaN valuesif (np.isnan(row['com_p1_x']) or np.isnan(row['com_p1_y']) or np.isnan(row['com_p2_x']) or np.isnan(row['com_p2_y'])):continue# Get current positions p1_pos = np.array([row['com_p1_x'], row['com_p1_y']]) p2_pos = np.array([row['com_p2_x'], row['com_p2_y']])# Calculate current connecting vector and direction current_connect = p2_pos - p1_pos current_distance = np.linalg.norm(current_connect)# Skip frames where people are at the same positionif current_distance ==0:continue current_direction = current_connect / current_distance# Calculate vector from reference position to current position p1_vector = p1_pos - reference_p1_pos p2_vector = p2_pos - reference_p2_pos# Project these vectors onto the current connecting line# Positive values mean moving toward the other person p1_projection = np.dot(p1_vector, current_direction) p2_projection =-np.dot(p2_vector, -current_direction)# Store values timeseries_data.loc[idx, 'p1_com_approach_pos'] = p1_projection timeseries_data.loc[idx, 'p2_com_approach_pos'] = p2_projection# Verify results filled_p1 = timeseries_data['p1_com_approach_pos'].notna().sum() filled_p2 = timeseries_data['p2_com_approach_pos'].notna().sum()print(f"Frames with filled values - p1: {filled_p1}, p2: {filled_p2}")return timeseries_datadef main():"""Main processing function"""# Create output directory if it doesn't exist os.makedirs(OUTPUT_PATH, exist_ok=True)# Get all CSV files from layer 1 processing layer1_files = glob.glob(INPUT_LAYER1_PATH +'*.csv')# Process each filefor file_path in layer1_files:# Extract video name from file path vid_name = os.path.basename(file_path).split('_keypoints_data_layer1.csv')[0]print(f"Processing video: {vid_name}")# Check if already processed output_file =f"{OUTPUT_PATH}/{vid_name}_processed_data_layer2.csv"if os.path.exists(output_file):print("Already processed, skipping...")continue# Determine video format and get FPS video_path =Nonefor ext in ['.avi', '.mp4', '.mov']:if os.path.exists(f"{VIDEO_PATH}/{vid_name}{ext}"): video_path =f"{VIDEO_PATH}/{vid_name}{ext}"breakifnot video_path:print(f"Video file not found for {vid_name}")continue cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release()print(f"Working on: {vid_name} with fps = {fps}")# Load and preprocess data ts = pd.read_csv(file_path) ts = frame_to_time(ts, fps=fps) ts = assign_left_right(ts)# Calculate statistics stats = process_time_data(ts)# Process tracked data processed_data = process_people_data_vectorized(ts, stats)# Create final data frame bb_data = pd.merge( stats, processed_data.reset_index(), on='time', how='outer' )# Extract time series data (using person 0 as reference) timeseries_data = bb_data[bb_data['person'] ==0].copy() # Make a copy to avoid SettingWithCopyWarning# Remove unnecessary columns timeseries_data = timeseries_data.drop(columns='person')# Calculate desired time step based on FPS desired_time_step =1/fps# Interpolate missing values nan_cols = timeseries_data.columns[timeseries_data.isna().any()].tolist() timeseries_data[nan_cols] = timeseries_data[nan_cols].interpolate()# Smooth distance with Savitzky-Golay filter timeseries_data['distance_smooth'] = savgol_filter(timeseries_data['distance'].values, 11, 3)# Calculate and smooth wrist speedsfor wrist in ['wrist_left_p1', 'wrist_right_p1', 'wrist_left_p2', 'wrist_right_p2']:# Calculate speed as Euclidean distance between consecutive positions timeseries_data[f'{wrist}_speed'] = np.sqrt( timeseries_data[f'{wrist}_x'].diff()**2+ timeseries_data[f'{wrist}_y'].diff()**2 )# Apply Savitzky-Golay filter for smoothing timeseries_data[f'{wrist}_speed_smooth'] = savgol_filter( timeseries_data[f'{wrist}_speed'].values, 11, 3 )# Fill NaN values before calculating proximity approach timeseries_data = timeseries_data.fillna(method='ffill')# Calculate proximity approach timeseries_data = calculate_proximity_approach(timeseries_data)# Fill any remaining NaN values timeseries_data = timeseries_data.fillna(method='ffill')# Smooth proximity approach values timeseries_data['p1_com_approach_pos'] = savgol_filter(timeseries_data['p1_com_approach_pos'].values, 11, 3) timeseries_data['p2_com_approach_pos'] = savgol_filter(timeseries_data['p2_com_approach_pos'].values, 11, 3)# Check for any remaining NaN valuesprint("Checking for NaN values...") nan_counts = timeseries_data.isna().sum()if nan_counts.sum() >0:print("Found NaN values after processing:")print(nan_counts)else:print("No NaN values found.")# Save processed data timeseries_data.to_csv(output_file, index=False)# Check for time continuityprint("Checking time continuity...") time_diffs = np.array([round(val, 2) for val in np.diff(timeseries_data['time'])])ifnot np.all(time_diffs == desired_time_step):print(f"Found gaps at times: {np.where(time_diffs != desired_time_step)[0]}")else:print("No missing time points.")if__name__=="__main__": main()
To provide an example of the data we plot here a timeseries of the resultant data.
Example output timeseries: 📊
Code
import pandas as pdimport glob as globimport osfolderstep12output ="./dataoutput_STEP1_2_timeseries/"# Load the CSV filecsv_file = glob.glob(os.path.join(folderstep12output +"*.csv"))[0]df = pd.read_csv(csv_file)# Display the first few rows of the DataFrameprint(df.head())
We advise users to double check the quality of the tracking data and the derived variables, both to verify quality as well as to get a better intuitive and qualitative understanding of their quantitative methods. We have argued elsewhere that especially when working with derived measurements of high-dimensional data that researchers can train to become better in their intuitive grasp for their measurements by relating it real-time with the raw video data (Miao et al. 2025). In this way we become better in our understanding of what the measure is and is not responding to; which, as we mention in the introduction to the companion paper, means becoming observant rather than a disinterested observer (Hacking 1983). The animation below is generated using the code STEP3_animation.py which couples video with quantiative data, allowing for qualitatively checking the timeseries real time against what is happening in the video. The user can select variables need to be plotted in the video plus time series animation. The code below provides the animation. Please check the DIY section for how to set up the motion tracking and run the code.
Code chunk: Animation code for coupling video with quantitative data 🚩
Code
import pandas as pdimport numpy as npimport globimport osimport cv2import mathfrom bisect import bisect_leftfrom scipy.signal import savgol_filterimport matplotlib.pyplot as pltimport seaborn as snsimport tqdmimport tempfilefrom moviepy import VideoFileClip# Path DefinitionsINPUT_LAYER1_PATH ='../dataoutput_STEP1_2_timeseries/'# Input directory containing tracked keypoint data from STEP1VIDEO_PATH ="../dataoutput_STEP1_1_rawposedata/"# Raw video files directoryOUTPUT_PATH ='../dataoutput_STEP1_3_animations/'# Output directory for processed timeseries datatargetvideo ="*sample_annotated_layer1_c150_miss95.mp4"# note that sample video must be set to True to process only a sample videoSAMPLE_VIDEO =True# Set to True to process only a sample video, False to process all videos# Create output directory if it doesn't existos.makedirs(OUTPUT_PATH, exist_ok=True)# animate only sample video?if SAMPLE_VIDEO:# For sample video, use a specific video file allvidsnew = glob.glob(VIDEO_PATH +"*"+ targetvideo)else: allvidsnew = glob.glob(VIDEO_PATH +"*.mp4")print(f"Found {len(allvidsnew)} videos to process")# what variables to animate with videoanimate_variables = {'x_min': False,'x_max': False,'com_x': False,'y_min': False,'y_max': False,'com_y': False,'centroid_x': False,'centroid_y': False,'distance': False,'distance_com': False,'shoulder_midpoint_p1_x': False,'shoulder_midpoint_p1_y': False,'shoulder_midpoint_p2_x': False,'shoulder_midpoint_p2_y': False,'distance_shoulder_midpoint': True,'wrist_left_p1_x': False,'wrist_left_p1_y': False,'wrist_right_p1_x': False,'wrist_right_p1_y': False,'com_p1_x': False,'com_p1_y': False,'centroid_p1_x': False,'centroid_p1_y': False,'wrist_left_p2_x': False,'wrist_left_p2_y': False,'wrist_right_p2_x': False,'wrist_right_p2_y': False,'com_p2_x': False,'com_p2_y': False,'centroid_p2_x': False,'centroid_p2_y': False,'distance_smooth': True,'wrist_left_p1_speed': False,'wrist_left_p1_speed_smooth': True,'wrist_right_p1_speed': False,'wrist_right_p1_speed_smooth': True,'wrist_left_p2_speed': False,'wrist_left_p2_speed_smooth': True,'wrist_right_p2_speed': False,'wrist_right_p2_speed_smooth': True,'p1_com_approach_pos': True,'p2_com_approach_pos': True}# plotting functionsdef plot_timeseries(timeseries_data, current_time=None, figsize=(15, 8)):""" Visualizing of the processed 1_2 motion variable data together with the original video. Groups variables by modality with consistent coloring for p1/p2 and left/right. Parameters: ----------- timeseries_data : pandas.DataFrame DataFrame containing all the motion analysis columns current_time : float, optional current time in seconds to mark with vertical line figsize : tuple, optional Figure size in inches (width, height) """# Filter data for only variables marked as True in animate_variables active_vars = [var for var, active in animate_variables.items() if active and var in timeseries_data.columns]ifnot active_vars:print("No variables selected for animation!")returnNone# Define consistent color scheme# Colors for p1/p2 (blue family for p1, red family for p2) p1_colors = {'left': '#1f77b4', 'right': '#aec7e8'} # Dark blue, light blue p2_colors = {'left': '#d62728', 'right': '#ff9896'} # Dark red, light red other_colors = ['#ff7f0e', '#2ca02c', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf']# Group variables by modality distance_vars = [var for var in active_vars if'distance'in var] position_vars = [var for var in active_vars ifany(x in var for x in ['_x', '_y', 'approach_pos']) and'distance'notin var] speed_vars = [var for var in active_vars if'speed'in var]# Calculate number of subplots needed n_plots =sum([len(distance_vars) >0, len(position_vars) >0, len(speed_vars) >0])if n_plots ==0:returnNone# Create figure with subplots fig, axes = plt.subplots(n_plots, 1, figsize=figsize, squeeze=False) axes = axes.flatten() plot_idx =0# Helper function to get consistent color and styledef get_style(var_name):# Determine person (p1/p2)if'_p1_'in var_name: person ='p1'elif'_p2_'in var_name: person ='p2'else: person ='other'# Determine side (left/right)if'left'in var_name: side ='left'elif'right'in var_name: side ='right'else: side ='other'# Get colorif person =='p1': color = p1_colors.get(side, p1_colors['left'])elif person =='p2': color = p2_colors.get(side, p2_colors['left'])else: color = other_colors[0] # Use first color for non-person variables# Get linestyle (solid for left, dashed for right)if side =='right': linestyle ='--'else: linestyle ='-'# Get alpha (lower for raw data, full for smoothed)if'smooth'in var_name: alpha =1.0elifany(x in var_name for x in ['speed', 'velocity']) and'smooth'notin var_name: alpha =0.3else: alpha =1.0return color, linestyle, alpha# 1. Distance Plotsif distance_vars: ax = axes[plot_idx] color_idx =0for var in distance_vars:ifany(x in var for x in ['_p1_', '_p2_']): color, linestyle, alpha = get_style(var)else: color = other_colors[color_idx %len(other_colors)] linestyle ='-' alpha =1.0 color_idx +=1 ax.plot(timeseries_data['time'], timeseries_data[var], label=var.replace('_', ' '), linewidth=2, color=color, linestyle=linestyle, alpha=alpha) ax.set_title('Distances Over Time') ax.set_ylabel('Distance') ax.grid(True, alpha=0.3) ax.legend()if'distance_shoulder_midpoint'in distance_vars or'distance'in distance_vars: ax.invert_yaxis() plot_idx +=1# 2. Position Plots (group p1/p2 and left/right together)if position_vars: ax = axes[plot_idx]for var in position_vars: color, linestyle, alpha = get_style(var) ax.plot(timeseries_data['time'], timeseries_data[var], label=var.replace('_', ' '), linewidth=2, color=color, linestyle=linestyle, alpha=alpha) ax.set_title('Positions Over Time') ax.set_ylabel('Position') ax.grid(True, alpha=0.3) ax.legend() plot_idx +=1# 3. Speed Plots (group p1/p2 and left/right together)if speed_vars: ax = axes[plot_idx]for var in speed_vars: color, linestyle, alpha = get_style(var) ax.plot(timeseries_data['time'], timeseries_data[var], label=var.replace('_', ' '), linewidth=2, color=color, linestyle=linestyle, alpha=alpha) ax.set_title('Speeds Over Time') ax.set_ylabel('Speed') ax.grid(True, alpha=0.3) ax.legend() plot_idx +=1# Add vertical line for current time if specifiedif current_time isnotNone:for ax in axes[:plot_idx]: ax.axvline(x=current_time, color='red', linestyle='--', alpha=0.7, linewidth=2)# Set common x-label axes[plot_idx-1].set_xlabel('Time (seconds)')# Adjust layout plt.tight_layout()return fig# Create animation for each videofor vids in allvidsnew: vidname = os.path.basename(vids)# remove substring "_annotated_layer1" lab ="_annotated_layer1" vidname = vidname.replace(lab, "") vidname = vidname[:-4]# also remove substring "_c150_miss95" vidname = vidname.replace("_c150_miss95", "")print(f"\nProcessing video: {vidname}")# Check if CSV file exists csv_path = os.path.join(INPUT_LAYER1_PATH, f'{vidname}_processed_data_layer2.csv')ifnot os.path.exists(csv_path):print(f"CSV file not found: {csv_path}")continue# Load the CSV file timeseries_data = pd.read_csv(csv_path)# Output paths temp_output = os.path.join(OUTPUT_PATH, f'{vidname}_distance_layer2_temp.mp4') final_output = os.path.join(OUTPUT_PATH, f'{vidname}_distance_layer2.mp4')# if already exists, skipif os.path.exists(final_output):print("Already processed, skipping...")continue# load the video file in opencv cap = cv2.VideoCapture(vids)ifnot cap.isOpened():print(f"Error opening video: {vids}")continue# Get video properties fps =int(cap.get(cv2.CAP_PROP_FPS)) width =int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height =int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames =int(cap.get(cv2.CAP_PROP_FRAME_COUNT))print(f"Video properties: {width}x{height}, {fps} FPS, {total_frames} frames")# Calculate time step based on FPS time_step =1.0/ fpsprint(f"Time step: {time_step:.4f} seconds per frame")# Define the output video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height))# Create temporary directory for plotswith tempfile.TemporaryDirectory() as temp_dir:print("Creating animated video...")# loop over the frames with tqdm progress bar current_time =0.0# Start at time 0for frame_idx in tqdm.tqdm(range(total_frames), desc=f"Processing {vidname}"):# read the frame success, frame = cap.read()ifnot success:break# plot the timeseries with current time fig = plot_timeseries(timeseries_data, current_time)if fig isNone:print("No plot generated, skipping frame") out.write(frame) current_time += time_step # Increment by time stepcontinue# save the plot to a temp file plot_path = os.path.join(temp_dir, f'plot_{frame_idx:06d}.png') fig.savefig(plot_path, dpi=100, bbox_inches='tight') plt.close(fig) # Important: close figure to free memory# Read the plot image plot_img = cv2.imread(plot_path)if plot_img isNone:print(f"Error reading plot image: {plot_path}") out.write(frame) current_time += time_step # Increment by time stepcontinue# Calculate overlay area (bottom third of the frame) slice_start =2* (height //3) slice_height = height - slice_start# Resize plot to fit overlay area plot_img_resized = cv2.resize(plot_img, (width, slice_height))# Create overlay on the frame frame_copy = frame.copy() frame_copy[slice_start:slice_start + slice_height, :, :] = plot_img_resized# write the frame to the output video out.write(frame_copy) current_time += time_step # Increment by time step (seconds)# Release everything cap.release() out.release()print(f"Temporary video saved, now re-encoding with MoviePy...")# Re-encode with MoviePy for better compatibilitytry: clip = VideoFileClip(temp_output) clip.write_videofile(final_output, codec='libx264', audio_codec='aac') clip.close()# Remove temporary fileif os.path.exists(temp_output): os.remove(temp_output)print(f"Final output video saved as {final_output}")exceptExceptionas e:print(f"Error during MoviePy re-encoding: {e}")print(f"Temporary video available at: {temp_output}")print("\nAll videos processed!")
Figure S6. Animation of motion tracking data coupled with video
4 Step 2: Summary feature extraction: Smoothness and other features
We have motivated in the main paper accompanying this notebook that smoothness can be a useful feature to understand the quality of interactions. Our pipeline focuses on the extraction of this specific feature, while we also show how easy it is to generate other features from the time series data that are processed after step 1.
We take two approaches to quantify smoothness of the movement data. The first approach is based on the spectral arc length metric, which has been shown to be a stable measure of smoothness for prolonged (quasi-cyclical) time series data (Balasubramanian et al. 2015). The approach is based on analyzing the frequency spectrum, specifically the changes in energy over the different frequencies, for the lower frequencies (< 10Hz; but this can be set by the user). Specifally it calculates the spectral arc length (distance of two points along a curve) from Fourier magnitude spectrum of the speed data (or in our case changes in the distance or proximity positions). This code is directly copied from the original repository by the lead developer of the metric, Sivak Balasubramanian. We refer to the (Balasubramanian et al. 2015) for further details and validation of this novel measure of smoothness. As can be seen, the higher values of the spectral arc length indicate smoother movements, while lower values indicate more jerky movements.
Pipeline code step 2, function for calculating spectral arc length 🚩
Code
import numpy as np# directly copied from the original repository https://github.com/siva82kb/smoothness/blob/master/python/smoothness.pydef spectral_arclength(movement, fs, padlevel=4, fc=10.0, amp_th=0.05):""" Calcualtes the smoothness of the given speed profile using the modified spectral arc length metric. Parameters ---------- movement : np.array The array containing the movement speed profile. fs : float The sampling frequency of the data. padlevel : integer, optional Indicates the amount of zero padding to be done to the movement data for estimating the spectral arc length. [default = 4] fc : float, optional The max. cut off frequency for calculating the spectral arc length metric. [default = 10.] amp_th : float, optional The amplitude threshold to used for determing the cut off frequency upto which the spectral arc length is to be estimated. [default = 0.05] Returns ------- sal : float The spectral arc length estimate of the given movement's smoothness. (f, Mf) : tuple of two np.arrays This is the frequency(f) and the magntiude spectrum(Mf) of the given movement data. This spectral is from 0. to fs/2. (f_sel, Mf_sel) : tuple of two np.arrays This is the portion of the spectrum that is selected for calculating the spectral arc length. Notes ----- This is the modfieid spectral arc length metric, which has been tested only for discrete movements. It is suitable for movements that are a few seconds long, but for long movements it might be slow and results might not make sense (like any other smoothness metric). Examples -------- >>> t = np.arange(-1, 1, 0.01) >>> move = np.exp(-5*pow(t, 2)) >>> sal, _, _ = spectral_arclength(move, fs=100.) >>> '%.5f' % sal '-1.41403' """# Number of zeros to be padded. nfft =int(pow(2, np.ceil(np.log2(len(movement))) + padlevel))# Frequency f = np.arange(0, fs, fs/nfft)# Normalized magnitude spectrum Mf =abs(np.fft.fft(movement, nfft)) Mf = Mf/max(Mf)# Indices to choose only the spectrum within the given cut off frequency Fc.# NOTE: This is a low pass filtering operation to get rid of high frequency# noise from affecting the next step (amplitude threshold based cut off for# arc length calculation). fc_inx = ((f <= fc)*1).nonzero() f_sel = f[fc_inx] Mf_sel = Mf[fc_inx]# Choose the amplitude threshold based cut off frequency.# Index of the last point on the magnitude spectrum that is greater than# or equal to the amplitude threshold. inx = ((Mf_sel >= amp_th)*1).nonzero()[0] fc_inx =range(inx[0], inx[-1]+1) f_sel = f_sel[fc_inx] Mf_sel = Mf_sel[fc_inx]# Calculate arc length new_sal =-sum(np.sqrt(pow(np.diff(f_sel)/(f_sel[-1] - f_sel[0]), 2) +pow(np.diff(Mf_sel), 2)))return new_sal, (f, Mf), (f_sel, Mf_sel)
The second approach is based on the log dimensionless squared jerk metric, which is the traditional measure of smoothness in biomechanics (Hogan and Sternad 2009). Jerk is the third derivative of our proximity time-series. We therefore take the change in proximity from timepoint to timepoint, divided by the time interval. Doing this once, on the proximity values, gets us the first derivative: speed. The derivative of speed is calculated the same way, giving us acceleration, or how fast the speed is changing from timepoint to timepoint. The derivative of acceleration is then jerk. We utilize jerk here because of its theoretical importance in human movement sciences, for example in modeling complex biomechanical movements as well as interpersonal coordination.
While integrating over the jerk timeseries has been a popular measure of smoothness, it is affected by the amplitude and duration of movements, which can make comparisons between different movements challenging. A solution is to calculate dimensionless (squared) jerk. The calculation results in a single (individual-level), positive value. The mathematical underpinnings of this calculation are outside of the scope of this m. However, what is important to note here is that the measure (dimensionless jerk) was based on theoretical reasoning relating to our primary research questions.
Below the function that calculates the dimensionless squared jerk metric from position data. This measure is based on taking the third derivative with respect to time (jerk), squared and integrated over the movement duration, and then normalized by the amplitude of the movement. The log of this value is taken keep the values in a reasonable range.
Pipeline code step 2, function for calculating dimensionless squared jerk 🚩
Code
import pandas as pdimport osimport numpy as npfrom scipy.signal import savgol_filterfrom scipy import integrateimport matplotlib.pyplot as pltimport seaborn as snsdef dimensionless_squared_jerk_from_position(ts, time):""" Calculate the dimensionless squared jerk metric from position data. Parameters: ----------- ts : array_like Position data points, should be a 1D numpy array time : array_like Time points corresponding to the ts data, should be a 1D numpy array Returns: -------- float Dimensionless squared jerk metric or NaN if calculation fails """try:# First check the raw inputs ts = np.array(ts, dtype=float) time = np.array(time, dtype=float)print(f"Original shape - ts: {ts.shape}, time: {time.shape}")print(f"NaN count before processing - ts: {np.isnan(ts).sum()}, time: {np.isnan(time).sum()}")# Input validation before preprocessingiflen(ts) !=len(time):print(f"Error: Arrays must have the same length. ts: {len(ts)}, time: {len(time)}")return np.naniflen(ts) <11: # Minimum length for savgol filterprint(f"Error: Input arrays too short for savgol window size=11. Length: {len(ts)}")return np.nan# Check if time has NaNs - we need to fix time firstif np.isnan(time).any():print("Warning: Time array contains NaNs, filling with linear sequence") valid_time_mask =~np.isnan(time)ifnot valid_time_mask.any():print("Error: All time values are NaN")return np.nan# Create a proper time sequence time = np.linspace(np.nanmin(time), np.nanmax(time), len(time))# Check if input data is all NaNif np.isnan(ts).all():print("Error: All input values are NaN")return np.nan# Identify valid data points for interpolation valid_mask =~np.isnan(ts) valid_indices = np.where(valid_mask)[0]iflen(valid_indices) <2:print(f"Error: Not enough valid data points for interpolation. Found {len(valid_indices)}")return np.nan# Handle the interpolation more carefully# 1. Use valid points to interpolateif np.isnan(ts).any():try: # Create interpolator with valid points only valid_time = time[valid_mask] valid_data = ts[valid_mask]# Sort by time to ensure proper interpolation sort_idx = np.argsort(valid_time) valid_time = valid_time[sort_idx] valid_data = valid_data[sort_idx]# Create interpolation function f = interpolate.interp1d( valid_time, valid_data, bounds_error=False, fill_value=(valid_data[0], valid_data[-1]) # Extrapolate with edge values )# Apply interpolation ts_interpolated = f(time)# Check if interpolation fixed all NaNsif np.isnan(ts_interpolated).any():print(f"Warning: Interpolation failed to fix all NaNs. Remaining: {np.isnan(ts_interpolated).sum()}")# Last resort: replace any remaining NaNs with nearest valid value ts_interpolated = pd.Series(ts_interpolated).fillna(method='ffill').fillna(method='bfill').values ts = ts_interpolatedexceptExceptionas e:print(f"Error during interpolation: {str(e)}")return np.nan# Verify no NaNs remain after preprocessingif np.isnan(ts).any() or np.isnan(time).any():print("Error: Failed to remove all NaN values")return np.nan# Ensure time steps are uniform for derivative calculation uniform_time = np.linspace(time[0], time[-1], len(time))ifnot np.allclose(time, uniform_time):# If time is not uniform, resample the dataprint("Warning: Time steps not uniform, resampling data")# Use scipy's interpolation again to resample f = interpolate.interp1d(time, ts, bounds_error=False, fill_value='extrapolate') ts = f(uniform_time) time = uniform_time# Calculate the time step dt = np.diff(time)[0]if dt <=0:print(f"Error: Time steps must be positive. Got dt={dt}")return np.nan# Print state after preprocessingprint(f"Data after preprocessing - Length: {len(ts)}")print(f"NaN check after preprocessing - ts: {np.isnan(ts).any()}, time: {np.isnan(time).any()}")print(f"Range - ts: {np.min(ts)} to {np.max(ts)}, time: {np.min(time)} to {np.max(time)}")# Calculate speed (exactly as in original) speed = np.gradient(ts, dt) sns.lineplot(data=speed)# Smooth savgol filter (maintaining original settings) speed = savgol_filter(speed, 11, 3)# Calculate acceleration (exactly as in original) acceleration = np.gradient(speed, dt)# Smooth (maintaining original settings) acceleration = savgol_filter(acceleration, 11, 3) sns.lineplot(data=acceleration)# Calculate jerk (exactly as in original) jerk = np.gradient(acceleration, dt) sns.lineplot(data=jerk)# Smooth (maintaining original settings) jerk = savgol_filter(jerk, 11, 3)# Calculate movement duration (D) movement_duration = time[-1] - time[0]if movement_duration <=0:print(f"Error: Movement duration must be positive. Got {movement_duration}")return np.nan# Calculate movement amplitude by integrating speed position = integrate.simpson(speed, x=time) movement_amplitude =abs(position) # Use absolute value to ensure positive# Prevent division by zero epsilon =1e-10if movement_amplitude < epsilon:print(f"Warning: Movement amplitude very small ({movement_amplitude}). Using epsilon.") movement_amplitude = epsilon# Calculate the squared jerk squared_jerk = jerk **2# Integrate the squared jerk integrated_squared_jerk = integrate.simpson(squared_jerk, x=time)# Ensure positive value for integral of squared jerkif integrated_squared_jerk <0:print(f"Warning: Negative integral of squared jerk: {integrated_squared_jerk}. Using absolute value.") integrated_squared_jerk =abs(integrated_squared_jerk)# Calculate the dimensionless squared jerk dimensionless_jerk = integrated_squared_jerk * (movement_duration**5/ movement_amplitude**2)return dimensionless_jerkexceptExceptionas e:print(f"Error calculating dimensionless squared jerk: {str(e)}")return np.nan
4.1 Example smoothness results on simulated time series data
In the below example we show how the two smoothness metrics behave for three different time series data. The first time series is a smooth sinusoidal signal, the second is a noisy version of the first, and the third is a more noisy version of the second. The spectral arc length metric should be higher for the smooth signal, while the dimensionless squared jerk metric should be lower for the smooth signal (as it is less jerky).
Example code for checking behavior of the smoothness metrics 🏴
Code
import matplotlib.pyplot as pltimport numpy as np# Generate the time series datats1 = np.arange(5, 10, 0.01)ts2 = np.arange(5, 10, 0.01) + np.random.normal(0, 0.1, len(ts1))ts3 = np.arange(5, 10, 0.01) + np.random.normal(0, 0.5, len(ts1))# Create sinusoidal time seriestime_axis = np.arange(5, 10, 0.01)ts1 = np.sin(ts1)ts2 = np.sin(ts2)ts3 = np.sin(ts3)# Calculate metrics (assuming you have these functions defined)spects1 = spectral_arclength(ts1, fs=1)spects2 = spectral_arclength(ts2, fs=1)spects3 = spectral_arclength(ts3, fs=1)jerkts1 = dimensionless_squared_jerk_from_position(ts1, time_axis)
Original shape - ts: (500,), time: (500,)
NaN count before processing - ts: 0, time: 0
Data after preprocessing - Length: 500
NaN check after preprocessing - ts: False, time: False
Range - ts: -0.9589242746631385 to 0.9999920733059184, time: 5.0 to 9.989999999999894
Original shape - ts: (500,), time: (500,)
NaN count before processing - ts: 0, time: 0
Data after preprocessing - Length: 500
NaN check after preprocessing - ts: False, time: False
Range - ts: -0.9708388487557869 to 0.9999840634408801, time: 5.0 to 9.989999999999894
Original shape - ts: (500,), time: (500,)
NaN count before processing - ts: 0, time: 0
Data after preprocessing - Length: 500
NaN check after preprocessing - ts: False, time: False
Range - ts: -0.9998750306847253 to 0.9999998149768836, time: 5.0 to 9.989999999999894
In the below pipeline code for step 2, we calculate smoothness for the distance between the two hands, as well as the distance to the center of mass (COM) and centroid of the two hands. We also calculate the smoothness of the wrist speed profiles, which are derived from the distance time series data. When SPARC is in the variable name, it refers to the spectral arc length metric, while smoothness will refer to the dimensionless squared jerk metric.
Pipeline code step 2 complete procedure 🚩
Code
import matplotlib.pyplot as pltimport seaborn as snsimport tempfilefrom scipy import integratefrom scipy import interpolatefrom scipy.signal import savgol_filterimport numpy as npimport pandas as pdimport osimport glob# specifically we might be interested in computing the smoothness of the distanceinputfol ='../dataoutput_STEP1_2_timeseries/'outputfol ='../dataoutput_STEP2_features/'metadata = pd.read_csv('../meta/project_pointsibling_metadata_starttimes.csv', encoding='latin1')constantwindowsize_sec =100# we want an equal portion of each timeseries to be analyzed# function to calculate spectral arc lengthdef spectral_arclength(movement, fs, padlevel=4, fc=10.0, amp_th=0.05):""" Calcualtes the smoothness of the given speed profile using the modified spectral arc length metric. Parameters ---------- movement : np.array The array containing the movement speed profile. fs : float The sampling frequency of the data. padlevel : integer, optional Indicates the amount of zero padding to be done to the movement data for estimating the spectral arc length. [default = 4] fc : float, optional The max. cut off frequency for calculating the spectral arc length metric. [default = 10.] amp_th : float, optional The amplitude threshold to used for determing the cut off frequency upto which the spectral arc length is to be estimated. [default = 0.05] Returns ------- sal : float The spectral arc length estimate of the given movement's smoothness. (f, Mf) : tuple of two np.arrays This is the frequency(f) and the magntiude spectrum(Mf) of the given movement data. This spectral is from 0. to fs/2. (f_sel, Mf_sel) : tuple of two np.arrays This is the portion of the spectrum that is selected for calculating the spectral arc length. Notes ----- This is the modfieid spectral arc length metric, which has been tested only for discrete movements. It is suitable for movements that are a few seconds long, but for long movements it might be slow and results might not make sense (like any other smoothness metric). Examples -------- >>> t = np.arange(-1, 1, 0.01) >>> move = np.exp(-5*pow(t, 2)) >>> sal, _, _ = spectral_arclength(move, fs=100.) >>> '%.5f' % sal '-1.41403' """# Number of zeros to be padded. nfft =int(pow(2, np.ceil(np.log2(len(movement))) + padlevel))# Frequency f = np.arange(0, fs, fs/nfft)# Normalized magnitude spectrum Mf =abs(np.fft.fft(movement, nfft)) Mf = Mf/max(Mf)# Indices to choose only the spectrum within the given cut off frequency Fc.# NOTE: This is a low pass filtering operation to get rid of high frequency# noise from affecting the next step (amplitude threshold based cut off for# arc length calculation). fc_inx = ((f <= fc)*1).nonzero() f_sel = f[fc_inx] Mf_sel = Mf[fc_inx]# Choose the amplitude threshold based cut off frequency.# Index of the last point on the magnitude spectrum that is greater than# or equal to the amplitude threshold. inx = ((Mf_sel >= amp_th)*1).nonzero()[0] fc_inx =range(inx[0], inx[-1]+1) f_sel = f_sel[fc_inx] Mf_sel = Mf_sel[fc_inx]# Calculate arc length new_sal =-sum(np.sqrt(pow(np.diff(f_sel)/(f_sel[-1] - f_sel[0]), 2) +pow(np.diff(Mf_sel), 2)))return new_sal, (f, Mf), (f_sel, Mf_sel)# function to calculate the dimensionless squared jerk metric from position datadef dimensionless_squared_jerk_from_position(ts, time):""" Calculate the dimensionless squared jerk metric from position data. Parameters: ----------- ts : array_like Position data points, should be a 1D numpy array time : array_like Time points corresponding to the ts data, should be a 1D numpy array Returns: -------- float Dimensionless squared jerk metric or NaN if calculation fails """try:# First check the raw inputs ts = np.array(ts, dtype=float) time = np.array(time, dtype=float)print(f"Original shape - ts: {ts.shape}, time: {time.shape}")print(f"NaN count before processing - ts: {np.isnan(ts).sum()}, time: {np.isnan(time).sum()}")# Input validation before preprocessingiflen(ts) !=len(time):print(f"Error: Arrays must have the same length. ts: {len(ts)}, time: {len(time)}")return np.naniflen(ts) <11: # Minimum length for savgol filterprint(f"Error: Input arrays too short for savgol window size=11. Length: {len(ts)}")return np.nan# Check if time has NaNs - we need to fix time firstif np.isnan(time).any():print("Warning: Time array contains NaNs, filling with linear sequence") valid_time_mask =~np.isnan(time)ifnot valid_time_mask.any():print("Error: All time values are NaN")return np.nan# Create a proper time sequence time = np.linspace(np.nanmin(time), np.nanmax(time), len(time))# Check if input data is all NaNif np.isnan(ts).all():print("Error: All input values are NaN")return np.nan# Identify valid data points for interpolation valid_mask =~np.isnan(ts) valid_indices = np.where(valid_mask)[0]iflen(valid_indices) <2:print(f"Error: Not enough valid data points for interpolation. Found {len(valid_indices)}")return np.nan# Handle the interpolation more carefully# 1. Use valid points to interpolateif np.isnan(ts).any():try: # Create interpolator with valid points only valid_time = time[valid_mask] valid_data = ts[valid_mask]# Sort by time to ensure proper interpolation sort_idx = np.argsort(valid_time) valid_time = valid_time[sort_idx] valid_data = valid_data[sort_idx]# Create interpolation function f = interpolate.interp1d( valid_time, valid_data, bounds_error=False, fill_value=(valid_data[0], valid_data[-1]) # Extrapolate with edge values )# Apply interpolation ts_interpolated = f(time)# Check if interpolation fixed all NaNsif np.isnan(ts_interpolated).any():print(f"Warning: Interpolation failed to fix all NaNs. Remaining: {np.isnan(ts_interpolated).sum()}")# Last resort: replace any remaining NaNs with nearest valid value ts_interpolated = pd.Series(ts_interpolated).fillna(method='ffill').fillna(method='bfill').values ts = ts_interpolatedexceptExceptionas e:print(f"Error during interpolation: {str(e)}")return np.nan# Verify no NaNs remain after preprocessingif np.isnan(ts).any() or np.isnan(time).any():print("Error: Failed to remove all NaN values")return np.nan# Ensure time steps are uniform for derivative calculation uniform_time = np.linspace(time[0], time[-1], len(time))ifnot np.allclose(time, uniform_time):# If time is not uniform, resample the dataprint("Warning: Time steps not uniform, resampling data")# Use scipy's interpolation again to resample f = interpolate.interp1d(time, ts, bounds_error=False, fill_value='extrapolate') ts = f(uniform_time) time = uniform_time# Calculate the time step dt = np.diff(time)[0]if dt <=0:print(f"Error: Time steps must be positive. Got dt={dt}")return np.nan# Print state after preprocessingprint(f"Data after preprocessing - Length: {len(ts)}")print(f"NaN check after preprocessing - ts: {np.isnan(ts).any()}, time: {np.isnan(time).any()}")print(f"Range - ts: {np.min(ts)} to {np.max(ts)}, time: {np.min(time)} to {np.max(time)}")# Calculate speed (exactly as in original) speed = np.gradient(ts, dt)# Smooth savgol filter (maintaining original settings) speed = savgol_filter(speed, 11, 3)# Calculate acceleration (exactly as in original) acceleration = np.gradient(speed, dt)# Smooth (maintaining original settings) acceleration = savgol_filter(acceleration, 11, 3)# Calculate jerk (exactly as in original) jerk = np.gradient(acceleration, dt)# Smooth (maintaining original settings) jerk = savgol_filter(jerk, 11, 3)# Calculate movement duration (D) movement_duration = time[-1] - time[0]if movement_duration <=0:print(f"Error: Movement duration must be positive. Got {movement_duration}")return np.nan# Calculate movement amplitude by integrating speed position = integrate.simpson(speed, x=time) movement_amplitude =abs(position) # Use absolute value to ensure positive# Prevent division by zero epsilon =1e-10if movement_amplitude < epsilon:print(f"Warning: Movement amplitude very small ({movement_amplitude}). Using epsilon.") movement_amplitude = epsilon# Calculate the squared jerk squared_jerk = jerk **2# Integrate the squared jerk integrated_squared_jerk = integrate.simpson(squared_jerk, x=time)# Ensure positive value for integral of squared jerkif integrated_squared_jerk <0:print(f"Warning: Negative integral of squared jerk: {integrated_squared_jerk}. Using absolute value.") integrated_squared_jerk =abs(integrated_squared_jerk)# Calculate the dimensionless squared jerk dimensionless_jerk = integrated_squared_jerk * (movement_duration**5/ movement_amplitude**2)# Final sanity checkif np.isnan(dimensionless_jerk) or np.isinf(dimensionless_jerk):print(f"Warning: Result is {dimensionless_jerk}. Details: ")print(f" Movement duration: {movement_duration}")print(f" Movement amplitude: {movement_amplitude}")print(f" Integrated squared jerk: {integrated_squared_jerk}")return np.nan# log the resultprint(f"Dimensionless squared jerk: {dimensionless_jerk}")# log the dimensionless jerk dimensionless_jerk = np.log(dimensionless_jerk)return dimensionless_jerkexceptExceptionas e:print(f"Error calculating dimensionless squared jerk: {str(e)}")return np.nan# df for smoothness datenewdfcolumns = ['videoID','timeadjusted', 'originalsamples','adjustedsamples', 'start_time_analysiswindow', 'end_time_analysiswindow', 'perc_twopersonsdetected', 'average_com_movementp1', 'average_com_movementp2', 'smoothness_distancecom', 'SPARC_smoothness_distancecom', 'smoothness_distancecentroid', 'smoothness_xy_average_com_p1', 'smoothness_xy_average_com_p2', 'smoothness_xy_average_centroid_p1', 'smoothness_xy_average_centroid_p2', 'smoothness_p1_proximity', 'smoothness_p2_proximity']newdf = pd.DataFrame(columns=newdfcolumns)# check for each csv file for layer2 datalayer2dat = glob.glob(inputfol +'*.csv')#print(layer2dat)# loop over the csv layer 2 datafor vids in layer2dat:print(vids)# Load the CSV timeseries file ts = pd.read_csv(vids)# get the features videoID = os.path.basename(vids).split('_processed_data_layer2.csv')[0] originalsamples =len(ts["time"]) perc_twopersonsdetected = ts['both_tracked'].sum() /len(ts)# check metadata start start = metadata[metadata['VIDEO_ID'] == videoID]['start'].valuesprint("start time of this video: "+str(start))# calculate the average movement of the com for each person average_com_movementp1 = np.mean(np.sqrt((ts['com_p1_x'].diff()**2+ ts['com_p1_y'].diff()**2))) average_com_movementp2 = np.mean(np.sqrt((ts['com_p2_x'].diff()**2+ ts['com_p2_y'].diff()**2)))# add a time adjusted variable to the datasetif start.size >0: # check if endtime not greater than the last time in the datasetif (start[0] + constantwindowsize_sec) > ts['time'].max():print("End time is greater than the last time in the dataset, setting end time to max value") timeadjusted ="TRUE - Adjusted to start at + "+str(start[0]) +"With a window length of: "+str(constantwindowsize_sec) +" seconds"# Take a timeseries chunk of 150 seconds ts = ts[(ts['time'] >= start[0]) & (ts['time'] <= start[0] + constantwindowsize_sec)]if start.size ==0: timeadjusted ="FALSE - Not adjusted to start time as no start time was given for this video, window length is still set to: "+str(constantwindowsize_sec) +" seconds" ts = ts[(ts['time'] <= constantwindowsize_sec)] adjustedsamples =len(ts["time"])print(timeadjusted) # fps is mode timestaps per second fps =1/(ts['time'].diff().mode()[0])# add time start and time end start_time_analysiswindow = start[0] if start.size >0else0 end_time_analysiswindow = start_time_analysiswindow + constantwindowsize_sec# calculate the smoothness of the distance between the com and centroid smoothness_distancecom = dimensionless_squared_jerk_from_position(ts['distance_com'].values, ts['time'].values)# take the derivative of the distance distancecomspeed = np.gradient(ts['distance_com'].values, ts['time'].values) SPARCsmoothness_distancecom = spectral_arclength(distancecomspeed, 1/fps, padlevel=4, fc=10.0, amp_th=0.05)# it is the first value of the distancecomspeed SPARCsmoothness_distancecom = SPARCsmoothness_distancecom[0]#print(smoothness_distancecom) smoothness_distancecentroid = dimensionless_squared_jerk_from_position(ts['distance'].values, ts['time'].values)# calculate the smoothness of the xy positions for each person smoothness_xy_average_com_p1 = (dimensionless_squared_jerk_from_position(ts['com_p1_x'],ts['time'].values)+dimensionless_squared_jerk_from_position(ts['com_p1_y'],ts['time'].values))/2 smoothness_xy_average_com_p2 = (dimensionless_squared_jerk_from_position(ts['com_p2_x'],ts['time'].values)+dimensionless_squared_jerk_from_position(ts['com_p2_y'],ts['time'].values))/2 smoothness_xy_average_centroid_p1 = (dimensionless_squared_jerk_from_position(ts['centroid_p1_x'],ts['time'].values)+dimensionless_squared_jerk_from_position(ts['centroid_p1_y'],ts['time'].values))/2 smoothness_xy_average_centroid_p2 = (dimensionless_squared_jerk_from_position(ts['centroid_p2_x'],ts['time'].values)+dimensionless_squared_jerk_from_position(ts['centroid_p2_y'],ts['time'].values))/2# calculate the smoothness of the proximity approach smoothness_p1_proximity = dimensionless_squared_jerk_from_position(ts['p1_com_approach_pos'].values, ts['time'].values) smoothness_p2_proximity = dimensionless_squared_jerk_from_position(ts['p2_com_approach_pos'].values, ts['time'].values)# append to the new df using concat newdf = pd.concat([newdf, pd.DataFrame([[videoID, timeadjusted, originalsamples, adjustedsamples, start_time_analysiswindow, end_time_analysiswindow, perc_twopersonsdetected, average_com_movementp1, average_com_movementp2, smoothness_distancecom, SPARCsmoothness_distancecom, smoothness_distancecentroid, smoothness_xy_average_com_p1, smoothness_xy_average_com_p2, smoothness_xy_average_centroid_p1, smoothness_xy_average_centroid_p2, smoothness_p1_proximity, smoothness_p2_proximity]], columns=newdfcolumns)], ignore_index=True)# save the new dfnewdf.to_csv(outputfol +'smoothness_data.csv', index=False, encoding='latin1')newdf.head()# print doneprint("Done with smoothness processing pipeline!")
Example output smoothness: 📊
For each video we will now have a smoothness feature set.
Code
import pandas as pdimport glob as globimport os# Load the CSV filecsv_file ="./dataoutput_STEP2_features/smoothness_data.csv"df = pd.read_csv(csv_file)# Display the first few rows of the DataFrameprint(df.head())
4.1.1 Extract features for all videos using tsfresh
To show how easy it is to extract features from the time series data, we use the tsfresh library (Christ et al. 2018). This library contains a large number of features designed to characterize time series data. While we do not use them in our analysis, it provides a convenient example of how easy it is to extract features from the created time series in a data-driven way. The user can of course change this code to add more features or to extract only a subset of features.
Extracting more features example tsfresh 🏴
Code
import pandas as pdimport globimport osfrom tsfresh import extract_features# Initialize list to store features from each fileall_features = []timeseries = glob.glob('./dataoutput_STEP1_2_timeseries/*_processed_data_layer2.csv')# for checking lets do the first 2 time seriestimeseries = timeseries[:2] # comment this line to process all files# Process each file individuallyforfilein timeseries: ts_data = pd.read_csv(file) video_id =file.split('/')[-1].split('_processed')[0]# Clean NaN values for the current file distance_data = ts_data['distance_com'].copy() distance_data = distance_data.interpolate(method='linear') distance_data = distance_data.fillna(method='ffill') distance_data = distance_data.fillna(method='bfill')# Create DataFrame for current file only current_data = pd.DataFrame({'id': video_id,'time': range(len(distance_data)),'distance_com': distance_data })# Extract features for current file only current_features = extract_features(current_data, column_id='id', column_sort='time')# Add to our collection all_features.append(current_features)# Optional: print progressprint(f"Processed {video_id} - extracted {len(current_features.columns)} features")# Clear variables to free memorydel ts_data, distance_data, current_data, current_features
# Combine all features at the endcombined_features = pd.concat(all_features, ignore_index=True)print(f"Total: Extracted {len(combined_features.columns)} features from {len(combined_features)} videos")
For each video we will now have a smoothness feature set.
Code
import pandas as pdimport glob as globimport os# Load the CSV filecsv_file ="./dataoutput_STEP2_features/tsfresh_features_fromCOMdistance.csv"df = pd.read_csv(csv_file)# Display the first few rows of the DataFrameprint(df.head())
For each script we have provided a requirements.txt file. You can install the requirements using pip, by first navigating to the folder where the script is located and then running the following command in your terminal:
The development of infant-caregiver interaction was studied using Cross-Recurrence Quantification Analysis (CRQA). CRQA is a nonlinear time series analysis technique that can be used to study synchronization and the strength and direction of coupling dynamics between two systems (Marwan & Kurths, 2002; Shockley et al., 2002). Specifically, we used CRQA to quantify the strength and direction of coupling between infant and caregiver based on changes in their centre of mass during the 7 lab visits. To assess whether the overall strength of coupling changes over time we will use a generalized mixed model with the CRQA measure of coupling strength (determinism) as the dependent variable and age and dyad relation as predictors.
Using CRQA we can quantify the extent to which the dynamics of two different systems are coupled over time. To do so, we evaluate whether two systems occupy the same regions in a shared multidimensional phase space in which each coordinate represents a state, and a trajectory represents the evolution of system states over time. We can reconstruct a phase space trajectory from a single observed time series by creating delayed copies that represent so-called surrogate dimensions. Takens’ theorem (Takens 1981) guarantees the reconstructed trajectory will be topologically equivalent to the original, the important dynamical properties are recovered, not the exact same trajectory. The procedure concerns determining a time lag (embedding delay) to construct the surrogate dimensions (we used the first local minimum of the mutual information function, cf. (Fraser and Swinney 1986)) and determining how many surrogate dimensions we need (we used False Nearest Neighbor analysis, cf. (Kennel, Brown, and Abarbanel 1992)). We obtained an estimate for the delay and number of embedding dimensions for each COM time series in the data set and used the maximum delay (828) and maximum embedding dimension (3) for all CRQA analyses. For each dyad we now have 2 multivariate time series that can be thought of as trajectories through a reconstructed 3 dimensional phase space (cf. (Hasselman 2022)). The next step is to create a distance matrix in which each cell represents the Euclidean distance between the coordinates of the two systems (see Figure below). The cells that form the main diagonal represent the distance between the states observed at exactly the same point in time. Moving away from the main diagonal, cells represent a distance to a state that occurred at an earlier or later point in time relative to the time series on the row or in columns. The next step is to choose a threshold value to determine which coordinates should be considered recurring states, yielding a binary matrix called the recurrence matrix, with only 0s and 1s. For each dyad and measurement occasion, a threshold was chosen that yielded a matrix in which a fixed number of cells contained a 1. These cells represent states that are very similar and occur in both time series also known as the Recurrence Rate, which we set at 5%. Recurring states around the main diagonal, indicate they occurred at approximately the same moment in time, recurrent states below or above the main diagonal occurred earlier (or later) relative to the time series on the X or Y axis.
Many different measures can be calculated from the recurrence matrix, in the present study we focus on determinism (DET) which is the proportion of the number of recurrent points that form a diagonal line on the total number of recurrent points. Diagonal lines indicate a consecutive sequence of states that occurred in one time series is repeated in the other time series in almost exactly the same way, or, both systems followed the same trajectory through state space for a while. Therefore, determinism is generally considered a measure of the strength of the coupling between the two systems.
Figure S8. Cross-recurrence plot. The two time series P1 and P2 are delay-embedded using a lag of 838 to create three surrogate dimensions. For each timepoint in P1, a distance is calculated to every other time point in P2 and vice versa. A threshold value is chosen to create a binary cross-recurrence matrix with 5% recurrent points. All measures calculated based on the line structures (done with R package casnet, (Hasselman 2025))
CRQA analysis in R 🚩
<!DOCTYPE html>
CRQA Analysis
CRQA Analysis
Fred Hasselman
2025-04-27
Authors
An Open-source Standardized Pipeline for Equitable Observations of Interactive Behavioral Dynamics: Theory-driven Measurement, Analysis, and Archiving
Arkadiusz Białek1, Wim Pouw2, 3, Travis J. Wiltshire2, James Trujillo4, Fred Hasselman5, Babajide Alamu Owoyele6, Natalia Siekiera1 & Joanna Rączaszek-Leonardi7
1 Institute of Psychology, Jagiellonian University 2 Department of Cognitive Science & Artificial Intelligence, Tilburg University 3 Donders Institute for Brain, Cognition and Behaviour, Radboud University Nijmegen 4 Institute for Logic, Language and Computation, University of Amsterdam 5 Behavioural Science Institute, Radboud University Nijmegen 6 Artificial Intelligence and Intelligent Systems, Hasso Plattner Institute Potsdam 7 Human Interactivity and Language Lab, Faculty of Psychology, University of Warsaw
# Load packageslibrary(rio)library(plyr)library(tidyverse)library(casnet) # If you have not installed it: devtools::install_github("FredHasselman/casnet")library(invctr)library(fpCompare)library(gplots)library(testthat)library(report)# Paths to files path_repo <-"/Users/fred/GitHub/InterPerDynPipeline"path_meta <-file.path(path_repo,"meta")path_oridata <-file.path(path_repo,"dataoutput_STEP1_2_timeseries")#path_seldata <- file.path(path_repo,"code_STEP3_nonlinear_analysis")path_results <-file.path(path_repo,"dataoutput_STEP3_nonlinearstatistics")
Prepare data
The code below records the steps taken to create the time series used for analysis:
For each dyad and session we look up which time points to select for analysis (info in the meta subfolder)
After selection based on the time code, all time series will be padded to length 7501 and are saved in dataoutput_STEP3_nonlinearstatistics
Figures of before and after the selection procedure are saved as [dyad]_selected_data.pdf in dataoutput_STEP3_nonlinearstatistics
Diagnostic data about the selection procedure is saved as [dyad]_dataselection_info.csv in dataoutput_STEP3_nonlinearstatistics
# Agesmeta_data <- rio::import(file.path(path_meta,"project_point_metadata_ages.csv"))# Data ranges to selectmeta_times <- rio::import(file.path(path_meta,"project_point_metadata_starttimes_seconds.csv"))# Select data segments using file metadata_starttimesmeta_times$exclude_start[is.na(meta_times$exclude_start)] <-0.00meta_times$exclude_end[is.na(meta_times$exclude_end)] <-0.00rio::export(meta_times,file.path(path_meta,"project_point_metadata_starttimes_seconds.csv"))# Get file listdata_files <-list.files(path = path_oridata, pattern ="point_")# Time series length (s)tsLength <-150# Subjectssubjects <- meta_data$subject_nrfor(p in subjects){ this_subject <- data_files[grep(pattern =paste0("(",p,"_)+"), x = data_files)]# Plot the data and write to PDFpdf(file.path(path_results, paste0(p ,"_selected_data.pdf")), paper ="a4r") res_data <-list()for(t inseq_along(this_subject)){ this_occassion <- rio::import(file.path(path_oridata,this_subject[t]))# Select cols colIDs <-which(colnames(this_occassion)%in%c("time","p1_com_approach_pos","p2_com_approach_pos")) data <- this_occassion |>select(all_of(colIDs)) framediffs_raw <-NCOL(ftable(diff(data$time))) framediffs <-as.data.frame(ftable(round_trunc(diff(data$time),2)))colnames(framediffs) <-c("frameDiffRounded","frequency") framediffs_round <-NROW(framediffs)plotTS_multi(data[,-1], title = p, subtitle =paste0("ORI: ",this_subject[t]))# Snip? snipIDs <- meta_times[grep(x = meta_times$VIDEO_ID, pattern =gsub(pattern ="_processed_data_layer2.csv", x = this_subject[t], replacement ="")),4:5]if(all(snipIDs %>>%0)){ data[data$time %>=% snipIDs$exclude_start&data$time%<=%snipIDs$exclude_end,2:3] <-NA# snipIDs$exclude_start <- data$time[data$time %>=% snipIDs$exclude_start][2]# data <- data |> filter(!(time %>=% snipIDs$exclude_start & time %<=% (snipIDs$exclude_end))) }# Select segment rowIDs <- meta_times[grep(x =meta_times$VIDEO_ID, pattern =gsub(pattern ="_processed_data_layer2.csv", x = this_subject[t], replacement ="")),2:3]if(rowIDs$stop%<<%(rowIDs$start+tsLength)){#minVal <- min(c(data[rowIDs$start:rowIDs$stop,2],data[rowIDs$start:rowIDs$stop,3]), na.rm = TRUE) pad <- (rowIDs$start+tsLength) - rowIDs$stop tmp_time <-seq(rowIDs$start, (rowIDs$start+tsLength), by =0.02) tmp_data <- data |>filter(time %>=% rowIDs$start, time %<=% rowIDs$stop) tmp_p1_com_approach_pos <-ts_trimfill(x = tmp_time, tmp_data[,2], padding =NA)$y tmp_p2_com_approach_pos <-ts_trimfill(x = tmp_time, tmp_data[,3], padding =NA)$y data <-data.frame(time = tmp_time,p1_com_approach_pos = tmp_p1_com_approach_pos,p2_com_approach_pos = tmp_p2_com_approach_pos)rm(tmp_data, tmp_time, tmp_p1_com_approach_pos, tmp_p2_com_approach_pos) } else { rowIDs$start <- data$time[data$time %>=% rowIDs$start][1] rowIDs$stop <- (rowIDs$start+tsLength) rowIDs$stop <-last(data$time[data$time %<=% rowIDs$stop]) data <- data |>filter(time %>=% rowIDs$start, time %<=% rowIDs$stop) } data$p1_com_diff <-c(0,diff(data$p1_com_approach_pos)) data$p2_com_diff <-c(0,diff(data$p2_com_approach_pos)) occ <-strsplit(x = this_subject[t], split ="_")[[1]][3] res_data[[t]] <-data.frame(file = this_subject[t],subject = p,session = occ,tsN_before =NROW(this_occassion),tsN_after =NROW(data),tsN_diff =7501-NROW(data), framediffs,framediffsN_raw = framediffs_raw,framediffsN_round = framediffs_round)# if(NROW(data)<7501){# rdif <- 7501-NROW(data)# tmp_data <- data.frame(time = rep(NA,rdif),# p1_com_approach_pos = rep(NA,rdif),# p2_com_approach_pos = rep(NA,rdif),# p1_com_diff = rep(NA,rdif),# p2_com_diff = rep(NA,rdif))# data <- rbind(data,tmp_data)# rm(tmp_data)# }plotTS_multi(data[,-1], title = p, subtitle =paste0("SEL: ",this_subject[t])) rio::export(data,file =file.path(path_results,paste0(p ,"_","session_",occ,".csv"))) } rio::export(ldply(res_data), file.path(path_results,paste0(p ,"_dataselection_info.csv")))dev.off()}
An example of output:
CRQA
Estimate Embedding Parameters
Parameters were estimated using lagged Mutual Information and False Nearest Neighbour analysis by using the function est_parameters() in package casnet:
# Estimate parameters ----data_files <-list.files(path = path_results, pattern ="(point\\_)(\\d)+(\\_session\\_)(\\d)\\.csv")for(p in subjects){ this_subject <- data_files[grep(pattern =paste0("(",p,"_)+"), x = data_files)]# Write figures to PDFpdf(file.path(path_results,paste0(p ,"_parameters.pdf"))) paramlist_p1 <- paramlist_p2 <-list()for(t inseq_along(this_subject)){ data <- rio::import(file.path(path_results,this_subject[t])) paramlist_p1[[t]] <-est_parameters(data$p1_com_diff, maxDim =5, doPlot =FALSE, silent =TRUE) paramlist_p2[[t]] <-est_parameters(data$p2_com_diff, maxDim =5, doPlot =FALSE, silent =TRUE) }dev.off()saveRDS(object =list(p1 = paramlist_p1, p2 = paramlist_p2), file =file.path(path_results, paste0(p ,"_","parameters.rds")))}# After inspection: Using the global.minimum as emLag, all optimal emDim are maximally 3# Collect the parameter estimates and write to fileparam_data <-ldply(subjects, function(p){ tmpL <-readRDS(file =file.path(path_results,paste0(p ,"_","parameters.rds")))names(tmpL$p1) <-paste0("session_", 1:length(tmpL$p1)) tmp_p1 <-ldply(tmpL$p1, function(d){d$optimData |>filter(emLag.method =="global.minimum",emDim ==3)} |>mutate(subject ="p1"))names(tmpL$p2) <-paste0("session_", 1:length(tmpL$p2)) tmp_p2 <-ldply(tmpL$p2, function(d){d$optimData |>filter(emLag.method =="global.minimum",emDim ==3)} |>mutate(subject ="p2"))return(rbind(tmp_p1,tmp_p2))})rio::export(param_data,file.path(path_results, "embedding_parameters_all.csv"))knitr::opts_chunk$set(fig.align ="left")
An example of output:
Cross-RQA
CRQA analyses were conducted using the maximum estimated lag (838) and 3 embedding dimensions, the target value for Recurrence Rate was 0.05 (5% fixed RR).
The CRQA results are calculated using rqa_par() in package casnet, which uses (some of the) massively parallel and memory saving algorithms implemented in Python library PyRQA.
The R output is captured and saved as a figure [dyad]_CRQAresults.pdf in dataoutput_STEP3_nonlinearstatistics
The results are saved as an R data file for each dyad as crqa_maxlag_[dyad]_[Nsessions]sessions.Rds in dataoutput_STEP3_nonlinearstatistics
data_files <-list.files(path = path_results, pattern ="(point\\_)(\\d)+(\\_session\\_)(\\d)\\.csv")param_data <- rio::import(file.path(path_results, "embedding_parameters_all.csv"))meta_data <- rio::import(file.path(path_meta,"project_point_metadata_ages.csv"))meta_times <- rio::import(file.path(path_meta,"project_point_metadata_starttimes_seconds.csv"))subjects <- meta_data$subject_nremDim <-3emLag_max <-max(param_data$emLag)emLag_median <-median(param_data$emLag)for(p in subjects){ crqaOut <-list() this_subject <- data_files[grep(pattern =paste0("(",p,"_)+"), x = data_files)]# Write results to PDFpdf(file.path(path_results, paste0(p ,"_CRQAresults.pdf")), paper ="a4r")for(t inseq_along(this_subject)){ occ <-strsplit(x = this_subject[t], split ="_|[.]")[[1]][4] data <- rio::import(file =file.path(path_results, paste0(p ,"_","session_",occ,".csv")))# CRQA using massively parallel computing and bit encoding crqaOut[[t]] <-rqa_par(y1 = data$p1_com_diff,y2 = data$p2_com_diff,emDim = emDim,emLag = emLag_max,emRad =NA,targetValue = .05,anisotropyHV =TRUE,AUTO =FALSE,silent =FALSE,doPlot =FALSE) outTable <-attributes(crqaOut[[t]])$measuresTable gplots::textplot( testthat::capture_output({cat(paste(this_subject[t],"\n\n\n"))cat(" Global Measures\n")print(format(outTable$`Global Measures`, digits =3))cat(paste("\n\n Line-based Measures\n"))print(format(outTable$`Line-based Measures`, digits =3))cat(paste("\n\n Horizontal/Vertical line anisotropy\n"))print(format(outTable$`Horizontal/Vertical line anisotropy`, digits =3)) }), cex =0.8 ) }dev.off()saveRDS(object = crqaOut, file =file.path(path_results,paste0("crqa_maxlag_",p,"_",t,"sessions.Rds")))rm(crqaOut)}
An example of a Distance matrix:
An example of a Distance matrix thresholded to yield 5% recurrent points:
An example of output:
This code collects the results generated in the previous section into 1 file called crqa_results_all.csv.
In order to run the more complex CRQA analysis described above, you can open the file ./code_STEP3_nonlinear_analysis/STEP3_nonlinearfeatures_CRQA.Rmd in R Studio.
Before running the markdown file you will need to make sure you have a few packages installed (run this in R or R Studio): install.packages(c(“rio”,“plyr”,“tidyverse”,“invctr”,“fpCompare”,“gplots”,“testthat”))devtools::install_github(“FredHasselman/casnet”)
6 Step 4: Statistical analysis (R)
6.1 Step 4.1: Statistical report on smoothness analysis in R
For the individual level analysis of the smoothness of the individual approach/avoidance behavior, we fit a linear mixed model using lme4(Bates et al. 2015) where the individual smoothness metric was the outcome variable, culture (Yurakaré or Polish) was the predictor variable, and we included a random intercept to account for individuals being nested within sibling pairs. Overall, the model suggests there was no evidence of an effect of culture on the smoothness of individual approach/avoidance movements, (β = 0.12, 95% CI [−0.53, 0.77], marginal R² = .004). For the Polish group, the estimated marginal mean was 38.3 (SE = 0.45, 95% CI [37.3, 39.2]). For the Yurakaré group, the estimated marginal mean was 39.5 (SE = 0.45, 95% CI [37.6, 49.5]). The figure below shows the overall pattern. In order to evaluate whether the results of these models were biased due to singular fit issues in estimating the random effects, we ran an equivalent linear regression including cluster robust standard error estimates (McNeish, Stapleton, and Silverman 2017) using the clubSandwich(Pustejovsky, Pekofsky, and Zhang 2025)R-package, which provided compatible results.
For the dyadic distance between center of mass analyses, we fit a simple linear regression with the smoothness of the distance between the dyad members’ center of mass as the outcome variable and culture (Yurakaré vs Polish) as the predictor variable. Consistent with the prior analyses, we did not observe a significant effect for culture (β = 0.15, 95% CI [−0.81, 1.11], R² = .006). The estimated marginal mean for the Polish group was 39.0 (SE = 0.64, 95% CI [37.7, 40.3]), and for the Yurakaré group it was 39.3 (SE = 0.64, 95% CI [38.0, 40.6])
Figure S9. Cross-cultural Comparisons of Individual Smoothness of Approach/Avoidance Behaviors (A) and Dyadic Smoothness in Proximity of Individuals’ Center of Mass (B)
Detailed smoothness analysis in R 🚩
<!DOCTYPE html>
Step 4 Smoothness Analyses
Step 4 Smoothness Analyses
Travis J. Wiltshire
2025-06-13
Cross-Cultural Data Analysis
Analysis of the Jerkiness of Proximity/Approach/Avoidance Meaures
Loading the data here and creating some extra variables
data <-read.csv("../dataoutput_STEP2_features/smoothness_data.csv") data_cross_culture <- data[1:30,]# Add a culture grouping variabledata_cross_culture$culture <-ifelse(grepl("cop_y", data_cross_culture$videoID), "Yurakare",ifelse(grepl("cop_p", data_cross_culture$videoID), "Polish", NA))# Restructure data into long formatlibrary(dplyr)
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
library(tidyr)data_long <- data_cross_culture %>%filter(!grepl("stab", videoID, ignore.case =TRUE)) %>%pivot_longer(cols =c(smoothness_p1_proximity, smoothness_p2_proximity),names_to ="person",values_to ="jerkiness_value")# Create a separate data frame to check for stabilityyurakare_stab_test <- data_cross_culture[data_cross_culture$culture =="Yurakare", ]yurakare_stab_test$is_stab <-as.factor(grepl("stab", yurakare_stab_test$videoID, ignore.case =TRUE))yurakare_stab_test_long <- yurakare_stab_test %>%pivot_longer(cols =c(smoothness_p1_proximity, smoothness_p2_proximity),names_to ="person",values_to ="jerkiness_value")# Remove the stab tests heredata_cross_culture <- data_cross_culture %>%filter(!grepl("stab", videoID, ignore.case =TRUE))
Stability Test
The results below show that the stability correction for the Yurakare videos does not have an effect on the estimated jerkiness values.
library(lme4)
## Loading required package: Matrix
##
## Attaching package: 'Matrix'
## The following objects are masked from 'package:tidyr':
##
## expand, pack, unpack
library(lmerTest)
##
## Attaching package: 'lmerTest'
## The following object is masked from 'package:lme4':
##
## lmer
## The following object is masked from 'package:stats':
##
## step
# Simplest model with random interceptmodel_stab <-lmer(jerkiness_value ~ is_stab + (1| videoID), data = yurakare_stab_test_long)
## boundary (singular) fit: see help('isSingular')
summary(model_stab)
## Linear mixed model fit by REML. t-tests use Satterthwaite's method [
## lmerModLmerTest]
## Formula: jerkiness_value ~ is_stab + (1 | videoID)
## Data: yurakare_stab_test_long
##
## REML criterion at convergence: 171.4
##
## Scaled residuals:
## Min 1Q Median 3Q Max
## -1.9877 -0.7048 -0.1491 0.4426 2.9370
##
## Random effects:
## Groups Name Variance Std.Dev.
## videoID (Intercept) 0.000 0.000
## Residual 4.552 2.133
## Number of obs: 40, groups: videoID, 20
##
## Fixed effects:
## Estimate Std. Error df t value Pr(>|t|)
## (Intercept) 39.1440 0.4770 38.0000 82.054 <2e-16 ***
## is_stabTRUE 0.8212 0.6746 38.0000 1.217 0.231
## ---
## Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
##
## Correlation of Fixed Effects:
## (Intr)
## is_stabTRUE -0.707
## optimizer (nloptwrap) convergence code: 0 (OK)
## boundary (singular) fit: see help('isSingular')
Jerkiness linear mixed effects models
#library(lme4)#library(lmerTest)# Simplest model with random interceptmodel1 <-lmer(jerkiness_value ~ culture + (1| videoID), data = data_long)summary(model1)
## Linear mixed model fit by REML. t-tests use Satterthwaite's method [
## lmerModLmerTest]
## Formula: jerkiness_value ~ culture + (1 | videoID)
## Data: data_long
##
## REML criterion at convergence: 150.8
##
## Scaled residuals:
## Min 1Q Median 3Q Max
## -1.48263 -0.68271 0.02591 0.74656 1.81783
##
## Random effects:
## Groups Name Variance Std.Dev.
## videoID (Intercept) 1.087 1.043
## Residual 1.823 1.350
## Number of obs: 40, groups: videoID, 20
##
## Fixed effects:
## Estimate Std. Error df t value Pr(>|t|)
## (Intercept) 39.3203 0.4470 18.0000 87.964 <2e-16 ***
## cultureYurakare -0.1763 0.6322 18.0000 -0.279 0.783
## ---
## Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
##
## Correlation of Fixed Effects:
## (Intr)
## cultureYrkr -0.707
library(ggplot2)ggplot(data_long, aes(x = culture, y = jerkiness_value, fill = culture)) +geom_violin(alpha =0.7, trim =FALSE) +geom_boxplot(width =0.2, outlier.shape =NA, alpha =0.6) +# Adds boxplot inside violingeom_jitter(width =0.2, alpha =0.5) +theme_minimal() +labs(title ="Comparison of Jerkiness in Proximity Between Cultures",y ="Jerkiness Value", x ="Culture") +scale_fill_manual(values =c("Yurakare"="#A6A24F", "Polish"="#E07A5F"))
# save the plotggsave("../images/smoothness_proximity_culture_comparison.png", width =8, height =6, dpi =300)
Double check model performance
library(performance)plot(check_model(model1))
## For confidence bands, please install `qqplotr`.
library(report)report(model1)
## We fitted a linear mixed model (estimated using REML and nloptwrap optimizer)
## to predict jerkiness_value with culture (formula: jerkiness_value ~ culture).
## The model included videoID as random effect (formula: ~1 | videoID). The
## model's total explanatory power is substantial (conditional R2 = 0.38) and the
## part related to the fixed effects alone (marginal R2) is of 2.73e-03. The
## model's intercept, corresponding to culture = Polish, is at 39.32 (95% CI
## [38.41, 40.23], t(36) = 87.96, p < .001). Within this model:
##
## - The effect of culture [Yurakare] is statistically non-significant and
## negative (beta = -0.18, 95% CI [-1.46, 1.11], t(36) = -0.28, p = 0.782; Std.
## beta = -0.11, 95% CI [-0.87, 0.66])
##
## Standardized parameters were obtained by fitting the model on a standardized
## version of the dataset. 95% Confidence Intervals (CIs) and p-values were
## computed using a Wald t-distribution approximation.
Center of Mass Distance Analyses
# This doesn't need the long datasetdis_model <-lm(smoothness_distancecom ~ culture, data=data_cross_culture)summary(dis_model)
##
## Call:
## lm(formula = smoothness_distancecom ~ culture, data = data_cross_culture)
##
## Residuals:
## Min 1Q Median 3Q Max
## -1.7370 -1.0689 -0.2204 0.7895 2.8297
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 39.1571 0.4085 95.853 <2e-16 ***
## cultureYurakare 0.3340 0.5777 0.578 0.57
## ---
## Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
##
## Residual standard error: 1.292 on 18 degrees of freedom
## Multiple R-squared: 0.01823, Adjusted R-squared: -0.03631
## F-statistic: 0.3343 on 1 and 18 DF, p-value: 0.5703
ggplot(data_cross_culture, aes(x = culture, y = smoothness_distancecom, fill = culture)) +geom_violin(alpha =0.7, trim =FALSE) +geom_boxplot(width =0.2, outlier.shape =NA, alpha =0.6) +# Adds boxplot inside violingeom_jitter(width =0.2, alpha =0.5) +theme_minimal() +labs(title ="Comparison of Jerkiness in Distance Between Center of Mass",y ="Jerkiness Value", x ="Culture") +scale_fill_manual(values =c("Yurakare"="#A6A24F", "Polish"="#E07A5F"))
# save the plotggsave("../images/smoothness_distancecom_culture_comparison.png", width =8, height =6, dpi =300)
Cluster Robust Standard Errors
Models above are giving singular fit and many random effects are near zero. Here I try a cluster robust standard error estimator, just to be sure if the results above aren’t driven by the random effects structure.
library(clubSandwich)
## Warning: package 'clubSandwich' was built under R version 4.3.3
## Registered S3 method overwritten by 'clubSandwich':
## method from
## bread.mlm sandwich
ggplot(data_cross_culture, aes(x = culture, y = SPARC_smoothness_distancecom, fill = culture)) +geom_violin(alpha =0.7, trim =FALSE) +geom_boxplot(width =0.2, outlier.shape =NA, alpha =0.6) +# Adds boxplot inside violingeom_jitter(width =0.2, alpha =0.5) +theme_minimal() +labs(title ="Comparison of SPARC in Distance Between Center of Mass",y ="Jerkiness Value", x ="Culture") +scale_fill_manual(values =c("Yurakare"="#A6A24F", "Polish"="#E07A5F"))
# save the plotggsave("../images/smoothness_sparc_culture_comparison.png", width =8, height =6, dpi =300)
Equivalence Testing
Here we want to interpret some of the null results further. The t-tests above are indicate that we did not observe a difference between the groups. This analysis can indicate whether or not the groups are approximately equivalent.
library(TOSTER)
## Warning: package 'TOSTER' was built under R version 4.3.3
## Proximitym1 =mean(data_long$jerkiness_value[data_long$culture =="Polish"], na.rm =TRUE)m2 =mean(data_long$jerkiness_value[data_long$culture =="Yurakare"], na.rm =TRUE)sd1 =sd(data_long$jerkiness_value[data_long$culture =="Polish"], na.rm =TRUE)sd2 =mean(data_long$jerkiness_value[data_long$culture =="Yurakare"], na.rm =TRUE)# Using .2 as an effect size here as we don't have prior experience with this measure and it's a small effect sizetsum_TOST(m1=m1,m2=m1,sd1=sd1,sd2=sd1,n1=10,n2=10,low_eqbound=-0.2, high_eqbound=0.2, alpha =0.05, var.equal=TRUE)
##
## Two Sample t-test
##
## The equivalence test was non-significant, t(18) = -0.237, p = 4.08e-01
## The null hypothesis test was non-significant, t(18) = 0.000, p = 1e+00
## NHST: don't reject null significance hypothesis that the effect is equal to zero
## TOST: don't reject null equivalence hypothesis
##
## TOST Results
## t df p.value
## t-test 0.0000 18 1.000
## TOST Lower 0.2372 18 0.408
## TOST Upper -0.2372 18 0.408
##
## Effect Sizes
## Estimate SE C.I. Conf. Level
## Raw 0 0.8433 [-1.4624, 1.4624] 0.9
## Hedges's g 0 0.4472 [-0.7044, 0.7044] 0.9
## Note: SMD confidence intervals are an approximation. See vignette("SMD_calcs").
## COMm3 =mean(data_cross_culture$smoothness_distancecom[data_cross_culture$culture =="Polish"], na.rm =TRUE)m4 =mean(data_cross_culture$smoothness_distancecom[data_cross_culture$culture =="Yurakare"], na.rm =TRUE)sd3 =sd(data_cross_culture$smoothness_distancecom[data_cross_culture$culture =="Polish"], na.rm =TRUE)sd4 =mean(data_cross_culture$smoothness_distancecom[data_cross_culture$culture =="Yurakare"], na.rm =TRUE)# Using .2 as an effect size here as we don't have prior experience with this measure and it's a small effect sizetsum_TOST(m1=m3,m2=m4,sd1=sd3,sd2=sd1,n1=10,n2=10,low_eqbound=-0.2, high_eqbound=0.2, alpha =0.05, var.equal=TRUE)
##
## Two Sample t-test
##
## The equivalence test was non-significant, t(18) = -0.184, p = 5.72e-01
## The null hypothesis test was non-significant, t(18) = -0.459, p = 6.51e-01
## NHST: don't reject null significance hypothesis that the effect is equal to zero
## TOST: don't reject null equivalence hypothesis
##
## TOST Results
## t df p.value
## t-test -0.4593 18 0.651
## TOST Lower -0.1843 18 0.572
## TOST Upper -0.7344 18 0.236
##
## Effect Sizes
## Estimate SE C.I. Conf. Level
## Raw -0.3340 0.7272 [-1.595, 0.9269] 0.9
## Hedges's g -0.1967 0.4485 [-0.9005, 0.5125] 0.9
## Note: SMD confidence intervals are an approximation. See vignette("SMD_calcs").
Longitudinal Data Analysis
Loading the data and merging with metadata
data2 <- data[31:length(data$videoID),]#Load Metadata file ages <-read.csv("../meta/project_point_metadata_ages.csv")# Merge these fileslibrary(stringr)# Ensure columns are characterages$subject_nr <-as.character(ages$subject_nr)data2$videoID <-as.character(data2$videoID)# Extract relevant information from `videoID`library(purrr) # Needed for map_chr()data2 <- data2 %>%mutate(T_value =str_split(videoID, "_") %>%map_chr(~ .x[3]), # Extracts third element after splitT_value =paste0("T", T_value), # Converts to "T1", "T2", etc.point_match =str_extract(videoID, "point_\\d+") # Extracts "point_X" for matching )# Extract "point_X" from subject_nr (to match gender)ages <- ages %>%mutate(point_match =str_extract(subject_nr, "point_\\d+"))# Reshape `ages` from wide to long format, keeping age in days and monthsages_long <- ages %>%pivot_longer(cols =matches("age_T[1-7]_(days|mo)"), # Selects columns like age_T1_days, age_T2_monames_to =c("T_value", "Age_Type"),names_pattern ="age_(T[1-7])_(.+)", # Extracts "T1", "T2", etc., and type (days/mo)values_to ="Age_Value" ) %>%pivot_wider(names_from = Age_Type, values_from = Age_Value) # Reshape back for separate days/mo columns# Merge `data2` with `ages_long` using both `T_value` and `point_match`data2_augmented <- data2 %>%left_join(ages_long, by =c("point_match", "T_value")) # Merge on "point_X" and "T1-T7"# Add time variabledata2_augmented <- data2_augmented %>%mutate(time =as.numeric(str_extract(T_value, "\\d+")))
Growth models for longitudinal mass distance analysis
model_long_lin <-lmer(smoothness_distancecom ~ days + (1| subject_nr), data = data2_augmented)model_long_quad <-lmer(smoothness_distancecom ~ days +I(days^2) + (1| subject_nr), data = data2_augmented)model_long_cub <-lmer(smoothness_distancecom ~ days +I(days^2) +I(days^3)+ (1| subject_nr), data = data2_augmented)#summary(model_long)# Alternative plotting methodcomp_perf <-compare_performance(model_long_lin, model_long_quad, model_long_cub)print(plot(comp_perf))
## Linear mixed model fit by REML. t-tests use Satterthwaite's method [
## lmerModLmerTest]
## Formula: smoothness_distancecom ~ days + (1 | subject_nr)
## Data: data2_augmented
##
## REML criterion at convergence: 364.2
##
## Scaled residuals:
## Min 1Q Median 3Q Max
## -1.3668 -0.5731 -0.2088 0.2153 4.3433
##
## Random effects:
## Groups Name Variance Std.Dev.
## subject_nr (Intercept) 0.3878 0.6228
## Residual 6.9483 2.6360
## Number of obs: 74, groups: subject_nr, 13
##
## Fixed effects:
## Estimate Std. Error df t value Pr(>|t|)
## (Intercept) 43.1339537 1.5370384 71.6348386 28.063 <2e-16 ***
## days -0.0009904 0.0046030 69.1567962 -0.215 0.83
## ---
## Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
##
## Correlation of Fixed Effects:
## (Intr)
## days -0.973
ggplot(data2_augmented, aes(x = days, y = smoothness_distancecom, color =factor(subject_nr))) +geom_point(alpha =0.6) +# Scatter plot with transparency for better visualizationgeom_smooth(method ="lm", aes(group =1), color ="black", linetype ="dashed") +# Overall regression linelabs(title ="Effect of Age on Jerkiness in Center of Mass Distance",x ="Age in # of Days",y ="Smoothness Distance",color ="Subject ID") +# Add a label for the color legendtheme_minimal() +scale_color_viridis_d() # Use a color scale that's distinct for each subject
## `geom_smooth()` using formula = 'y ~ x'
# save the plotggsave("../images/smoothness_distancecom_longitudinal.png", width =8, height =6, dpi =300)
## `geom_smooth()` using formula = 'y ~ x'
ggplot(data2_augmented, aes(x = days, y = smoothness_distancecom, color =factor(subject_nr), group = subject_nr)) +geom_point(alpha =0.6) +# Scatter plot with transparency for better visualizationgeom_line(alpha =0.6) +# Add lines for each subjectgeom_smooth(method ="lm", aes(group =1), color ="black", linetype ="dashed") +# Overall regression linelabs(title ="Effect of Age on Jerkiness in Center of Mass Distance",x ="Age in # of Days",y ="Smoothness Distance",color ="Subject ID") +# Add a label for the color legendtheme_minimal() +scale_color_viridis_d() # Use a color scale that's distinct for each subject
## `geom_smooth()` using formula = 'y ~ x'
# save the plotggsave("../images/smoothness_distancecom_longitudinal_lines.png", width =8, height =6, dpi =300)
## `geom_smooth()` using formula = 'y ~ x'
Orthogonal polynomials growth models
This part was added based on differences in scales for age variables
data2_augmented <- data2_augmented[!is.na(data2_augmented$days), ] #CHECKBYWIMorth_model_lin <-lmer(smoothness_distancecom ~poly(days) + (1| subject_nr), data = data2_augmented)orth_model_quad <-lmer(smoothness_distancecom ~poly(days,2) + (1| subject_nr), data = data2_augmented)orth_model_cub <-lmer(smoothness_distancecom ~poly(days,3) + (1| subject_nr), data = data2_augmented)plot(compare_performance(orth_model_lin, orth_model_quad,orth_model_cub))
## Random effect variances not available. Returned R2 does not account for random effects.
## Random effect variances not available. Returned R2 does not account for random effects.
## Random effect variances not available. Returned R2 does not account for random effects.
## Variables of class `logical` can't be rescaled and remain unchanged.
ggplot(data2_augmented, aes(x = days, y = smoothness_p1_proximity, color =factor(subject_nr), group = subject_nr)) +geom_point(alpha =0.6) +# Scatter plot with transparency for better visualizationgeom_line(alpha =0.6) +# Add lines for each subjectgeom_smooth(method ="lm", aes(group =1), color ="black", linetype ="dashed") +# Overall regression linelabs(title ="Effect of Age on Jerkiness in Infant Proximity",x ="Age in # of Days",y ="Smoothness Distance",color ="Subject ID") +# Add a label for the color legendtheme_minimal() +scale_color_viridis_d() # Use a color scale that's distinct for each subject
## `geom_smooth()` using formula = 'y ~ x'
# save the plotggsave("../images/smoothness_p1_proximity_longitudinal_lines.png", width =8, height =6, dpi =300)
## `geom_smooth()` using formula = 'y ~ x'
Testing a GAMM
library(mgcv)
## Loading required package: nlme
##
## Attaching package: 'nlme'
## The following object is masked from 'package:lme4':
##
## lmList
## The following object is masked from 'package:dplyr':
##
## collapse
## This is mgcv 1.8-42. For overview type 'help("mgcv-package")'.
# Fit the GAMMmodel_gamm <-gamm( smoothness_distancecom ~s(days), # Smooth function for daysrandom =list(subject_nr =~1), # Random effect for subject_nrdata = data2_augmented, na.action = na.omit # Handle missing data)# Summarize the GAMM modelsummary(model_gamm$gam)
plot(model_gamm$gam, pages =1, main ="Smooth Effect of Days")
# Get model predictionspredictions <-predict(model_gamm$gam, newdata = data2_augmented, type ="response")# Add predictions to the original data framedata2_augmented$predictions <- predictions# Plot original data and GAMM model predictionsggplot(data2_augmented, aes(x = days, y = smoothness_distancecom)) +geom_point(alpha =0.5, color ="blue") +# Scatter plot for the original datageom_line(aes(y = predictions), color ="red") +# Model predictions (smooth line)labs(title ="GAMM: Smoothness Distance vs. Days",x ="Days",y ="Smoothness Distance",caption ="Blue points: Original data | Red line: Model predictions" ) +theme_minimal() +theme(legend.position ="none")
# save the plotggsave("../images/smoothness_distancecom_gamm_predictions.png", width =8, height =6, dpi =300)
Growth models for longitudinal SPARC mass distance analysis
sparc_model_long_lin <-lmer(SPARC_smoothness_distancecom ~ days + (1| subject_nr), data = data2_augmented)
## boundary (singular) fit: see help('isSingular')
sparc_model_long_quad <-lmer(SPARC_smoothness_distancecom ~ days +I(days^2) + (1| subject_nr), data = data2_augmented)
## boundary (singular) fit: see help('isSingular')
sparc_model_long_cub <-lmer(SPARC_smoothness_distancecom ~ days +I(days^2) +I(days^3)+ (1| subject_nr), data = data2_augmented)
## boundary (singular) fit: see help('isSingular')
## Random effect variances not available. Returned R2 does not account for random effects.
## Random effect variances not available. Returned R2 does not account for random effects.
## Random effect variances not available. Returned R2 does not account for random effects.
6.2 Step 4.2: Statistical report on non-linear analysis in R
To analyze the development of parent-infant interactions, we fit a logistic (generalized) mixed model using lme4 (Bates et al. 2015), predicting the determinism (DET) measure obtained from CRQA for each dyad at each measurement occasion. The age of the infant (in days) was used as the time predictor and was centered on the average age of the sample at the first measurement occasion. The framework of generalized mixed models allows for the estimation of proportion data based on counts, where the denominator may vary from case to case. In the present dataset 6 dyads had 1 time series that was shorter than expected based on the selection criterion of 150 seconds (6 out of 86 dyadic series). Time series length will affect the number of recurrent points that can be detected; therefore, we used the number of recurrent points detected as a weight variable in the model (see supplementary data for details). A linear and quadratic effect of the time variable (age in days) were included in the model, as well as an interaction with a categorical variable indicating parent-infant relation; mother-daughter (N=6), mother-son (N=5), father-daughter (N=1) with the latter as the reference category. The dyad ID was entered as a random effect in the model. The model predictions are displayed in Figure blow. The estimated marginal means at 92.6 days (the average at the first measurement occasion) for the father-daughter dyads (M = .39, 95% CI [.36, .42]) did not differ from the mother-daughter dyads (M = .36, 95% CI [.35, .37]) or from the mother-son dyads (M = 0.40, 95% CI [.39, .41]), but the mother-daughter and mother-son dyads did differ from one another (OR = 0.86, zratio = -3.96, pTukey = 0.002). All interaction effects of the dyad type with linear and quadratic age effects were significant (see Table 1 below for the results of this model).
Figure S10. CRQA results plot
Table S1
Fixed Effects for Generalized Linear Mixed Model Predicting N_dlp/RP_N
Predictor
β
SE
z
p
Intercept
-0.485
0.031
-15.59
< .001
Gender (Girls)
-0.141
0.041
-3.47
< .001
Age (Linear)
0.403
0.003
153.65
< .001
Age (Quadratic)
-0.568
0.003
-172.69
< .001
Gender × Age (Linear)
-0.174
0.004
-42.27
< .001
Gender × Age (Quadratic)
0.346
0.005
73.28
< .001
Note. Model: binomial family with logit link. N = 86 observations, 13 subjects.
Detailed CRQA statistical analysis in R 🚩
<!DOCTYPE html>
CRQA Analysis
CRQA Analysis
Fred Hasselman
2025-04-27
Authors
An Open-source Standardized Pipeline for Equitable Observations of Interactive Behavioral Dynamics: Theory-driven Measurement, Analysis, and Archiving
Arkadiusz Białek1, Wim Pouw2, 3, Travis J. Wiltshire2, James Trujillo4, Fred Hasselman5, Babajide Alamu Owoyele6, Natalia Siekiera1 & Joanna Rączaszek-Leonardi7
1 Institute of Psychology, Jagiellonian University 2 Department of Cognitive Science & Artificial Intelligence, Tilburg University 3 Donders Institute for Brain, Cognition and Behaviour, Radboud University Nijmegen 4 Institute for Logic, Language and Computation, University of Amsterdam 5 Behavioural Science Institute, Radboud University Nijmegen 6 Artificial Intelligence and Intelligent Systems, Hasso Plattner Institute Potsdam 7 Human Interactivity and Language Lab, Faculty of Psychology, University of Warsaw
# Load packageslibrary(rio)library(plyr)library(tidyverse)library(casnet) # If you have not installed it: devtools::install_github("FredHasselman/casnet")library(invctr)library(fpCompare)library(gplots)library(testthat)library(report)library(emmeans)library(sjPlot)# Paths to files path_repo <-"../"path_meta <-file.path(path_repo,"meta")path_oridata <-file.path(path_repo,"dataoutput_STEP1_2_timeseries")path_results <-file.path(path_repo,"dataoutput_STEP3_nonlinearstatistics")
Descriptive
Because the ages vary between measurement occasions, center ages on the mean of the first session.
# save figureggsave(file.path(path_repo,"images/crqa_determinism_by_dyad.png"), width =10, height =7, dpi =300)# For the report we need the model fitted on the actual proportion, the form N_dlp/RP_N doesn't workg1.7a <-glmer(DET ~ dyad_fs *poly(age_days_cS1_u,2, raw =TRUE) + (1|subject_nr_f), weights = RP_N, family = binomial,crqa_data)
Fixed Effects for Generalized Linear Mixed Model Predicting N_dlp/RP_N
effect
term
estimate
std.error
statistic
p.value
conf.low
conf.high
fixed
(Intercept)
-0.505
0.067
-7.563
0.000
-0.636
-0.374
fixed
dyad_fsm-d
-0.082
0.071
-1.150
0.250
-0.222
0.058
fixed
dyad_fsm-s
0.046
0.073
0.626
0.532
-0.098
0.189
fixed
poly(age_days_cS1, 2)1
0.345
0.006
60.043
0.000
0.334
0.356
fixed
poly(age_days_cS1, 2)2
-0.642
0.007
-90.609
0.000
-0.656
-0.628
fixed
dyad_fsm-d:poly(age_days_cS1, 2)1
-0.238
0.006
-38.166
0.000
-0.250
-0.226
fixed
dyad_fsm-s:poly(age_days_cS1, 2)1
-0.541
0.006
-84.010
0.000
-0.553
-0.528
fixed
dyad_fsm-d:poly(age_days_cS1, 2)2
0.477
0.008
63.508
0.000
0.462
0.492
fixed
dyad_fsm-s:poly(age_days_cS1, 2)2
0.221
0.007
29.554
0.000
0.207
0.236
>
>
> **Random Effects:**
> - Subject (Intercept) SD: 0.067
> _ Model: binomial family with logit link. N = 86 observations, 13 dyads.
>
>
> **Model Summary (g1.7a):**
> - **Model type:** Generalized linear mixed model (binomial family with logit link)
> - **AIC:** 1321052
> - **BIC:** 1321076
> - **Log-likelihood:** -660515.8
> **Interpretation:** The model revealed significant effects of dyad type and age on determinism measures.
> Both linear and quadratic age effects were significant, with significant interactions between dyad type and age trends.
> The father-daughter dyad serves as the reference category in the model.
The open-source MaskAnyone toolbox (see DIY below) facilitates secure and reproducible video data de-identification, crucial for behavioral analysis, by removing personally identifiable information in different degrees while preserving movement dynamics. Built with React, FastAPI (Ramírez, n.d.), and Docker, this modular platform offers various configurable deidentification algorithms (blurring, suppression, silhouette rendering) and integrates segmentation models like SAM, YOLOv8, and OpenPose/MediaPipe. Accessible via web and CLI, MaskAnyone supports scalable deidentification and privacy validation for researchers with varying technical expertise.
Our masking approach equips researchers with context-sensitive masking decisions, which may have verying levels of reproducibility needed, or varying constraints such as cultural sensitivity and participant protection (see Table S2). Indeed, we think masking protocols should be applied through culturally aligned decision-making rather than indiscriminately. In our cross-cultural case study comparing Yurakaré sibling interactions with Polish lab sessions, we adopted masking strategies informed by participant consent, cultural and legal advisors, and recording flexibility, from which we learned that the notions of data being “identifiable” or “sensitive” may differ across communities and requires nuance. Below is a video that shows the masking procedure we opted for.
The table below the three masking strategies that we explored.
Table S2: MaskAnyone Strategy Elements (Hiding Approach)
Strategy
Description
Effectiveness
Illustrative Results
Blur
Applies Gaussian blurring at a user-controllable blur intensity level.
Moderate, may be limited depending on blur intensity.
Progressively blurred versions maintaining color and general structure
Pixellation
Reduces image resolution by grouping pixels into uniform blocks, creating a mosaic effect
Moderate to high, depending on block size
Blocky, mosaic-like appearance with preserved general shapes
Contours
Uses Canny edge detection to preserve essential contours while eliminating finer details. User-controlled detail.
Moderate, balances privacy with utility.
Black and white line drawings showing only structural boundaries
Note. This table presents the three primary hiding strategies implemented in the MaskAnyone system, detailing their technical approach, privacy effectiveness, and expected visual outcomes.
7.1 Ethical Considerations: Need for Integration in Research Ethics Protocols and Informed Consent
Ethical archiving answers to calls for wasting less resources in research and increasing reusability, reproducibility, as well as the ability for research integrity entities to assess research quality (Resnik, Antes, and Mozersky 2024). Indeed, as in the latter case masking is relevant even when data is not openly shared, but also enabling more data archiving, and archiving while lower the chance of privacy leaks (e.g., when data is breached).
Effective masking requires ongoing dialogue with research participants and communities. We recommend establishing clear consent protocols that explain to participants the masking options. Institutions also need to regularly review privacy adequacy as technology evolves, also to maintain flexibility to adjust approaches based on participant feedback. The goal is not merely technical anonymization, but culturally respectful privacy protection that enables meaningful research while honoring participant dignity and community values.
DIY: Trying masking yourself 🛠️
In earlier work of the masking project we utilized a light-weight masking approach using Mediapipe (Lugaresi et al. 2019) called MaskedPiper (B. Owoyele et al. 2022). We have recently provided an updated and enriched python notebook that allows for masking single-person videos in several ways.
MaskAnyone is the most recent release version of the masking project (B. A. Owoyele et al. 2024). The MaskAnyone code is available on GitHub: MaskAnyone and there is an installation guide available on the GitHub page.
MaskAnyone 2.0 example
8 References
Araujo, Steven S., Kevin E. Munoz, Loberlly N. Salazar, and Boris X. Vintimilla. 2025. “Detecting and Characterizing Group Interactions Using 3D Spatial Data to Enhance Human-Robot Engagement.” In Proceedings of the 2025 3rd International Conference on Robotics, Control and Vision Engineering, 26–34. RCVE ’25. New York, NY, USA: Association for Computing Machinery. https://doi.org/10.1145/3747393.3747398.
Balasubramanian, Sivakumar, Alejandro Melendez-Calderon, Agnes Roby-Brami, and Etienne Burdet. 2015. “On the Analysis of Movement Smoothness.”Journal of NeuroEngineering and Rehabilitation 12 (1): 112. https://doi.org/10.1186/s12984-015-0090-9.
Bates, Douglas, Martin Mächler, Ben Bolker, and Steve Walker. 2015. “Fitting Linear Mixed-Effects Models Using Lme4.”Journal of Statistical Software 67 (October): 1–48. https://doi.org/10.18637/jss.v067.i01.
Cao, Zhe, Gines Hidalgo, Tomas Simon, Shih-En Wei, and Yaser Sheikh. 2021. “OpenPose: Realtime Multi-Person 2D Pose Estimation Using Part Affinity Fields.”IEEE Transactions on Pattern Analysis and Machine Intelligence 43 (1): 172–86. https://doi.org/10.1109/TPAMI.2019.2929257.
Christ, Maximilian, Nils Braun, Julius Neuffer, and Andreas W. Kempa-Liehr. 2018. “Time Series FeatuRe Extraction on Basis of Scalable Hypothesis Tests (Tsfresh – A Python Package).”Neurocomputing 307 (September): 72–77. https://doi.org/10.1016/j.neucom.2018.03.067.
Fraser, Andrew M., and Harry L. Swinney. 1986. “Independent Coordinates for Strange Attractors from Mutual Information.”Physical Review A 33 (2): 1134–40. https://doi.org/10.1103/PhysRevA.33.1134.
Giulietti, Nicola, Davide Todesca, Marco Carnevale, and Hermes Giberti. 2025. “A Real-Time Human Pose Measurement System for Human-In-The-Loop Dynamic Simulators.”IEEE Access 13: 24954–69. https://doi.org/10.1109/ACCESS.2025.3538332.
Hacking, Ian. 1983. Representing and Intervening: Introductory Topics in the Philosophy of Natural Science. Cambridge: Cambridge University Press. https://doi.org/10.1017/CBO9780511814563.
Hasselman, Fred. 2022. “Early Warning Signals in Phase Space: Geometric Resilience Loss Indicators From Multiplex Cumulative Recurrence Networks.”Frontiers in Physiology 13: 859127. https://doi.org/10.3389/fphys.2022.859127.
———. 2025. Casnet: A Toolbox for Studying Complex Adaptive Systems and NETworks [Version 0.5.1]. Manual.
Hogan, Neville, and Dagmar Sternad. 2009. “Sensitivity of Smoothness Measures to Movement Duration, Amplitude and Arrests.”Journal of Motor Behavior 41 (6): 529–34. https://doi.org/10.3200/35-09-004-RC.
Jocher, Glenn, Ayush Chaurasia, and Jing Qiu. 2023. “YOLO by Ultralytics.”
Kennel, Matthew B., Reggie Brown, and Henry D. I. Abarbanel. 1992. “Determining Embedding Dimension for Phase-Space Reconstruction Using a Geometrical Construction.”Physical Review A 45 (6): 3403–11. https://doi.org/10.1103/PhysRevA.45.3403.
Lugaresi, Camillo, Jiuqiang Tang, Hadon Nash, Chris McClanahan, Esha Uboweja, Michael Hays, Fan Zhang, et al. 2019. “MediaPipe: A Framework for Building Perception Pipelines.” arXiv. https://doi.org/10.48550/arXiv.1906.08172.
McNeish, Daniel, Laura M. Stapleton, and Rebecca D. Silverman. 2017. “On the Unnecessary Ubiquity of Hierarchical Linear Modeling.”Psychological Methods 22 (1): 114–40. https://doi.org/10.1037/met0000078.
Miao, G. Q., J. Trujillo, L. S. Bulls, M. A. Thornton, R. Dale, and W. Pouw. 2025. “DIMS Dashboard for Exploring Dynamic Interactions and Multimodal Signals.” In Proceedings of the 47th Annual Conference of the Cognitive Science Society. San Francisco, CA: Cognitive Science Society, edited by A. Ruggeri, D. Barner, C. Walker, and N. Bramley. San Francisco, CA.
Owoyele, Babajide Alamu, Martin Schilling, Rohan Sawahn, Niklas Kaemer, Pavel Zherebenkov, Bhuvanesh Verma, Wim Pouw, and Gerard de Melo. 2024. “MaskAnyone Toolkit: Offering Strategies for Minimizing Privacy Risks and Maximizing Utility in Audio-Visual Data Archiving.”https://doi.org/10.48550/ARXIV.2408.03185.
Owoyele, Babajide, James Trujillo, Gerard de Melo, and Wim Pouw. 2022. “Masked-Piper: Masking Personal Identities in Visual Recordings While Preserving Multimodal Information.”SoftwareX 20 (December): 101236. https://doi.org/10.1016/j.softx.2022.101236.
Pustejovsky, James E., Samuel Pekofsky, and Jingru Zhang. 2025. “clubSandwich: Cluster-Robust (Sandwich) Variance Estimators with Small-Sample Corrections.”
Ramírez, Sebastián. n.d. “FastAPI.”
Resnik, David B., Alison Antes, and Jessica Mozersky. 2024. “Should Researchers Destroy Audio or Video Recordings?”Ethics & Human Research 46 (2): 30–35. https://doi.org/10.1002/eahr.500205.
Takens, Floris. 1981. “Detecting Strange Attractors in Turbulence.” Book Section. In Dynamical Systems and Turbulence, Warwick 1980, edited by David Rand and Lai-Sang Young, 898:366–81. Lecture Notes in Mathematics. Springer Berlin / Heidelberg.
Virtanen, Pauli, Ralf Gommers, Matt Oliphant Travis E. and Haberland, Tyler Reddy, Evgeni Cournapeau David and Burovski, Pearu Peterson, Jonathan Weckesser Warren and Bright, et al. 2020. “SciPy 1.0: Fundamental Algorithms for Scientific Computing in Python.”Nature Methods 17: 261–72. https://doi.org/10.1038/s41592-019-0686-2.
Winter, David A. 2009. Biomechanics and Motor Control of Human Movement. John Wiley & Sons.
Source Code
---title: "An Open-source Standardized Pipeline for Equitable Observations of Interactive Behavioral Dynamics"subtitle: "Theory-driven Measurement, Analysis, and Archiving"date: "`r format(Sys.time(), '%B %d, %Y')`"author: - name: "Arkadiusz Białek*" affiliation: Jagiellonian University, Poland - name: "Wim Pouw*" affiliation: Tilburg University, Netherlands - name: James Trujillo affiliation: University of Amsterdam, Netherlands - name: Fred Hasselman affiliation: Radboud University, Netherlands - name: Babajide Alamu Owoyele affiliation: Hasso Plattner Institute, University of Potsdam, Germany - name: Natalia Siekiera affiliation: Jagiellonian University, Poland - name: Joanna Rączaszek-Leonardi affiliation: University of Warsaw, Poland - name: Travis J. Wiltshire affiliation: Tilburg University, Netherlandscontact: - name: Wim Pouw email: w.pouw@tilburguniversity.edubibliography: quarto_dependencies/references/refs.bibtheme: journalcss: quarto_dependencies/styles/styles.cssformat: html: toc: true toc-location: left toc-title: "Contents" toc-depth: 3 number-sections: true code-fold: true code-tools: trueengine: knitr---\* Shared first author.# OverviewTo increase reproducibility of our approach we have designed this Quarto notebook that provides the user with what is effectively a manual-style extended methods and results section for two cases that follow the pipeline descrbed in Bialek, Pouw et al (under review). Next to staging and (visually) explaining key Python and R code elements, the notebook contains detailed further “Do It Yourself” (DIY) instructions that can be accessed on demand by clicking on DIY sections, for example providing the technical steps needed to install and then run code for YOLO tracking on (user-defined) data. This notebook is therefore not simply an add-on supplemental text, but it is a key part of our contribution as it allows for newcomers to social signal processing to start implementing this pipeline step by step.The Quarto notebook is structured to have the following components, next to traditional text sections:- 🚩 **pipeline code** that overview complete code associated with the pipeline, or shorter code chunks to, for example, explain a particular function. These code blocks are generally not, for the sake of efficiency, run inside the current notebook. Denoted with the symbol 🚩.- 🏴 **other code chunks** that we use for this quarto notebook to create sanity checks or animations related to measures or summarizing datasets. These code blocks are generally run inside the notebook. Denoted with the symbol 🏴.- 📊 **example output** that shows example output of the pipeline. Denoted with the symbol 📊.- 🛠️ **DIY blocks** that provide shorter, step-by-step links to the full code for executing a particular step in the pipeline on your own data. Denoted with the symbol 🛠️.- ⬇️ **dataset download** provide information about how to download the dataset overviewed for our report. Denoted with the symbol ⬇️.**What the Quarto notebook is not intended to do:** This notebook is not itself the pipeline (i.e., a collection of code that runs the complete pipeline). It is rather a linear step-by-step manual and explainer of how to understand and implement the pipeline steps. For actually applying this code, the user can further dig into the **DIY** blocks, assembling the pieces that they wish to use, and adjusting as needed. ### What skills do I need?In principle if you know how to run a python or R script you should be able to reproduce the pipeline on your own data. However, being able to run something should rather be the start of learning about the things that are covered in this pipeline. To fully understand the pipeline some programming skills in python and R are helpful, especially if you want to adjust things flexibly to your own data. Then some basic understanding of common time series operations, like differentiation, and smoothing, and general kinematic variables is valuable background (e.g., check the first few chapters of @winterBiomechanicsMotorControl2009; or even just start with Kahn Acadamy [kinematics unit](https://www.khanacademy.org/science/ap-college-physics-1/xf557a762645cccc5:kinematics)). Solid mathematical background is not required, but some basic understanding of linear algebra and calculus is further helpful. For non-linear analysis some understanding of the underpinnings of dynamical systems is helpful (which may require some basic understanding of systems of differential equations to fully appreciate the significance of).#### Notes on tested system specificationsOur pipeline has been tested on what amounts to modern (at 2025) PC gaming desktops with GPU support. However, in principle all code can be run without a GPU, but will take considerably more time for pose tracking at step 1 (by an estimated factor of 5 to 10 times). Step 1 to 4 was performed on a PC equipped with a 12th Gen Intel(R) Core(TM) i9-12900KF, NVIDIA GeForce RTX 4090 GPU, and 64 GB RAM running Windows 10 Home. Masking at step 5 was performed on a Dell XPS 17 9710 system equipped with an Intel Core i9-11980HK processor, NVIDIA GeForce RTX 3060 Laptop GPU, and 64 GB RAM running Windows 11 Home (Build 26100).# Datasets for our casesBelow is a description of the two datasets that were used as test cases for the pipeline. The datasets are available in masked form to reduce identifiability of the individuals in the dataset. Following the download guide for further information on how to access the datasets.::: {.callout-note .callout-download collapse="true"}## Download Guide ⬇️The masked data can be downloaded from the repository of Jagellonian University ([FORTHCOMING AFTER REVIEW]()). The unmasked data can be requested from the authors.Please note that the masked video data is not the input of the pipeline. YOLO tracking would not work well on masked videos as detailed information about the body is lost, and only represented as the motion tracking data that comes with the videoes. This motion tracking data is present in the repository and was used as input for the pipeline. The masked data can be used to check for qualitative changes in the behavior of the inviduals, and can be used to create animations such as in step 1.3.**Example of masked videodata**{fig-align="center" width=600}:::### Case 1: Dataset siblingsTen sibling pairs (total = 20 pairs) were observed from Polish urban culture and Yurakaré indigenous culture each. Observations were made during a semi-structured play situation: in the natural environment of the Yurakaré community and laboratory conditions in Poland. Siblings were asked to build a tower together using 54 wooden blocks while sitting facing each other. Each pair had 4 minutes to complete the task. This setup allowed for a systematic comparison of coordinated and mutually adjusted movements in siblings, using our pipeline to examine the smoothness of interaction while "doing things together" in these two cultural groups.::: {.callout-note .callout-download collapse="true"}## Plotting code for the Siblings dataset 🏴```{python, eval=FALSE, code_folding="hide"}import pandas as pdimport matplotlib.pyplot as pltimport numpy as npimport os# Load the datadf = pd.read_csv('./meta/project_siblings_metadata_ages_gender.csv')# Data preprocessingdf['AgeDifference'] =abs(df['P1agemonths'] - df['P2agemonths'])df['AverageAge'] = (df['P1agemonths'] + df['P2agemonths']) /2df['GenderCombo'] = df['GenderP1'] +'-'+ df['GenderP2']# Create a 2x2 subplot layoutfig, axs = plt.subplots(2, 2, figsize=(10, 8), gridspec_kw={'width_ratios': [1.5, 1], 'height_ratios': [1.5, 1]})fig.suptitle('Sibling Dataset Overview', fontsize=16, fontfamily='serif')# Define colors for gender combinationscolors = {'M-M': '#4472C4', # Blue'M-F': '#7030A0', # Purple'F-M': '#548235', # Green'F-F': '#C55A11'# Rust/Orange}# 1. Scatter plot - P1 age vs P2 agefor gender_combo, group in df.groupby('GenderCombo'): axs[0, 0].scatter( group['P1agemonths'], group['P2agemonths'], color=colors.get(gender_combo, 'gray'), alpha=0.7, label=gender_combo, s=50 )# Add reference linemin_age =min(df['P1agemonths'].min(), df['P2agemonths'].min())max_age =max(df['P1agemonths'].max(), df['P2agemonths'].max())axs[0, 0].plot([min_age, max_age], [min_age, max_age], 'k--', alpha=0.3)axs[0, 0].set_xlabel('P1 Age (months)', fontfamily='serif')axs[0, 0].set_ylabel('P2 Age (months)', fontfamily='serif')axs[0, 0].set_title('Participant Ages (P1 vs P2)', fontfamily='serif')axs[0, 0].grid(True, alpha=0.3)axs[0, 0].legend()# 2. Bar chart - Age differencesbar_positions = np.arange(len(df))bar_width =0.8axs[0, 1].bar( bar_positions, df['AgeDifference'], width=bar_width, color=[colors.get(combo, 'gray') for combo in df['GenderCombo']])axs[0, 1].set_xticks(bar_positions)axs[0, 1].set_xticklabels(df['Code'], rotation=90)axs[0, 1].set_xlabel('Sibling Pair', fontfamily='serif')axs[0, 1].set_ylabel('Age Difference (months', fontfamily='serif')axs[0, 1].set_title('Age Differences by Sibling Pair', fontfamily='serif')axs[0, 1].grid(True, alpha=0.3, axis='y')# 3. Pie chart - Gender distributiongender_counts = df['GenderCombo'].value_counts()axs[1, 0].pie( gender_counts, labels=gender_counts.index, autopct='%1.1f%%', colors=[colors.get(combo, 'gray') for combo in gender_counts.index], wedgeprops={'edgecolor': 'w', 'linewidth': 1})axs[1, 0].set_title('Gender Distribution', fontfamily='serif')# 4. Summary tablesummary_data = [ ["Total Sibling Pairs", f"{len(df)}"], ["Average P1 Age", f"{df['P1agemonths'].mean()} months"], ["Average P2 Age", f"{df['P2agemonths'].mean()} months"], ["Average Age Difference", f"{df['AgeDifference'].mean()/30:.1f} months"], ["Number of Males", f"{(df['GenderP1'] =='M').sum() + (df['GenderP2'] =='M').sum()}"], ["Number of Females", f"{(df['GenderP1'] =='F').sum() + (df['GenderP2'] =='F').sum()}"]]# Turn off axis for tableaxs[1, 1].axis('off')table = axs[1, 1].table( cellText=[row for row in summary_data], colLabels=["Measure", "Value"], loc='center', cellLoc='left')table.auto_set_font_size(False)table.set_fontsize(10)table.scale(1, 1.5)axs[1, 1].set_title('Dataset Summary', fontfamily='serif')plt.tight_layout()plt.subplots_adjust(top=0.9)# Save the figure to a fileoutput_path ="images/sibling_analysis.png"os.makedirs(os.path.dirname(output_path), exist_ok=True)plt.savefig(output_path, dpi=300, bbox_inches='tight')plt.close()```:::{fig-align="center" width=600}### Case 2: Dataset mother-infantThirteen infant-caregiver pairs visited the child laboratory monthly for a total of seven visits. The infants' ages at the first visit ranged from 7 to 9 months. Parents were instructed to "play as usual," with a set of different toys to encourage spontaneous interactions. Each visit lasted approximately 30 minutes and was divided into four stages. For our analyses, we focus on recordings from the stage where the infant was seated in a feeding chair, facing the caregiver sitting directly across. This controlled setup minimizes movement around the room, facilitating video motion tracking analysis. Additionally, our analyses are restricted to top-view recordings, ensuring that both the caregiver and the infant are captured within a single frame.::: {.callout-note .callout-download collapse="true"}## Plotting code for the Caregiver-Infant dataset 🏴```{python, eval=FALSE}import matplotlibmatplotlib.use('agg')import matplotlib.pyplot as pltplt.ioff()plt.clf() # Clear any existing figuresimport pandas as pdimport numpy as npimport os# Load the datadf = pd.read_csv('./meta/project_point_metadata_ages.csv')# Convert gender to categorical labeldf['gender_label'] = df['gender'].map({0: 'Female', 1: 'Male'})# Create a 2x2 subplot layoutfig, axs = plt.subplots(2, 2, figsize=(12, 10), gridspec_kw={'width_ratios': [1.5, 1], 'height_ratios': [1.5, 1]})fig.suptitle('Caregiver-Infant Dataset Overview', fontsize=16, fontfamily='serif')# Define colors for gendercolors = {'Female': '#C55A11', # Orange for females'Male': '#4472C4'# Blue for males}# 1. Line plot - Age progression across time pointsfor idx, row in df.iterrows(): gender = row['gender_label']# Create arrays for days and time points, handling NaN values days = [] timepoints = []for i inrange(1, 8): # T1 to T7 day_col =f'age_T{i}_days'if day_col in df.columns and pd.notna(row[day_col]) and row[day_col] >0: days.append(row[day_col]) timepoints.append(i)# Plot the line for this infant axs[0, 0].plot( timepoints, days, marker='o', color=colors[gender], alpha=0.5, linewidth=1.5 )# Customize the plotaxs[0, 0].set_xlabel('Time Point', fontfamily='serif')axs[0, 0].set_ylabel('Age (days)', fontfamily='serif')axs[0, 0].set_title('Infant Age Progression', fontfamily='serif')axs[0, 0].set_xticks(range(1, 8))axs[0, 0].set_xticklabels([f'T{i}'for i inrange(1, 8)])axs[0, 0].grid(True, alpha=0.3)# Add legend patchesfrom matplotlib.patches import Patchlegend_elements = [ Patch(facecolor=colors['Female'], edgecolor='w', label='Female'), Patch(facecolor=colors['Male'], edgecolor='w', label='Male')]axs[0, 0].legend(handles=legend_elements, loc='upper left')# 2. Box plot - Age distribution at each time pointtime_point_data = []labels = []for i inrange(1, 8): # T1 to T7 col =f'age_T{i}_days'if col in df.columns: valid_data = df[col].dropna()iflen(valid_data) >0: time_point_data.append(valid_data) labels.append(f'T{i}')# Create violin plotsif time_point_data: violin_parts = axs[0, 1].violinplot( time_point_data, showmeans=True, showmedians=True )# Color the violin plotsfor i, pc inenumerate(violin_parts['bodies']): pc.set_facecolor('#7030A0') # Purple pc.set_edgecolor('black') pc.set_alpha(0.7)# Set labelsaxs[0, 1].set_xticks(range(1, len(labels) +1))axs[0, 1].set_xticklabels(labels)axs[0, 1].set_xlabel('Time Point', fontfamily='serif')axs[0, 1].set_ylabel('Age (days)', fontfamily='serif')axs[0, 1].set_title('Age Distribution by Time Point', fontfamily='serif')axs[0, 1].grid(True, alpha=0.3, axis='y')# 3. Pie chart - Gender distributiongender_counts = df['gender_label'].value_counts()axs[1, 0].pie( gender_counts, labels=gender_counts.index, autopct='%1.1f%%', colors=[colors[g] for g in gender_counts.index], wedgeprops={'edgecolor': 'w', 'linewidth': 1})axs[1, 0].set_title('Gender Distribution', fontfamily='serif')# 4. Summary table# Calculate averages for each time point, handling NaN valuesaverages_days = []for i inrange(1, 8): col =f'age_T{i}_days'if col in df.columns: avg = df[col].mean()ifnot pd.isna(avg): avg_months = avg /30.44# Convert to months averages_days.append([f"T{i} Average Age", f"{avg:.1f} days ({avg_months:.1f} months)"])# Count participants at each time pointparticipants = []for i inrange(1, 8): col =f'age_T{i}_days'if col in df.columns: count = df[col].notna().sum() participants.append([f"Participants at T{i}", f"{count}"])# Overall statisticssummary_data = [ ["Total Infants", f"{len(df)}"], ["Females", f"{(df['gender'] ==0).sum()}"], ["Males", f"{(df['gender'] ==1).sum()}"]]# Add age ranges if columns existif'age_T1_days'in df.columns: summary_data.append(["Age Range T1", f"{df['age_T1_days'].min():.0f}-{df['age_T1_days'].max():.0f} days"])last_tp =7while last_tp >0: col =f'age_T{last_tp}_days'if col in df.columns and df[col].notna().sum() >0: summary_data.append([f"Age Range T{last_tp}", f"{df[col].min():.0f}-{df[col].max():.0f} days"])break last_tp -=1# Age mean if'age_T1_days'in df.columns: summary_data.append(["Mean Age T1", f"{df['age_T1_days'].mean():.1f} days ({df['age_T1_days'].mean()/30.44:.1f} months)"])if last_tp >0: col =f'age_T{last_tp}_days'if col in df.columns: summary_data.append([f"Mean Age T{last_tp}", f"{df[col].mean():.1f} days ({df[col].mean()/30.44:.1f} months)"])# Create the table with the most important summary statisticsaxs[1, 1].axis('off')table = axs[1, 1].table( cellText=summary_data + participants, loc='center', cellLoc='left')table.auto_set_font_size(False)table.set_fontsize(10)table.scale(1, 1.5)axs[1, 1].set_title('Dataset Summary', fontfamily='serif')plt.tight_layout()plt.subplots_adjust(top=0.9)# Save the figure to a fileoutput_path ="images/mother_infant_analysis.png"os.makedirs(os.path.dirname(output_path), exist_ok=True)plt.savefig(output_path, dpi=300, bbox_inches='tight')plt.close(fig) # Explicitly close the figure object```:::{fig-align="center" width=600}Note that in our github repository we use short sample data from this dataset from an caregiver-infant pair whom we are allowed to share the video data for the current purposes.## DIY: Setting up the repository on your local machine 🛠️### Step 0: Github repoYou should first clone the repository [InterPerDynPipeline](https://github.com/WimPouw/InterPerDynPipeline) and move to the root directory. The step 1 code for tracking is in the `./code_STEP1_posetrackingprocessing/` folder. In this folder you will find the Python script `yolo_tracking_processing.py`. To check whether you have the correct script you can compare against the code chunk provided below.First install git if you do not have it installed yet. You can find instructions on how to install git [here](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git).Then you can clone the repository using the following commands in your (anaconda) terminal:```bashgit clone https://github.com/WimPouw/InterPerDynPipelinecd ./InterPerDynPipeline/```:::# Step 1: Motion tracking, signal processing, and wranglingThe motion tracking and signal processing pipeline is divided into three steps: 1. **Tracking**: This step involves tracking the movements of individuals in the videos using YOLOv8. The output is a video with annotated keypoints and a CSV file containing the coordinates of these keypoints. 2. **Signal Processing**: This step involves processing the keypoint data to extract relevant features for analysis. The output are processed timeseries files, and a flat dataset containing our smoothness measures that are directly ready for statistical analysis. 3. **Animations**: This step involves animating the processed data together with the original video data to ensure that the processed data are of sufficient quality to reflect behavioral changes.## Step 1.1: Tracking two persons in videos for top-view (Python)We have tried several pose tracking solutions, such as OpenPose (model 25B; @caoOpenPoseRealtimeMultiPerson2021a), Mediapipe (default highest complexity blazepose model; @lugaresiMediaPipeFrameworkBuilding2019). We found that these models are not well equipped to track persons from top view, most likely because these models are not trained on ground-truth poses of persons from top-view camera angles. However, we found the heaviest model of YOLOv8 ("yolov8x-pose-p6.pt") does perform very well, especially as compared to the other models we tested. Thus, for this pipeline we recommend using YOLOv8 model [@jocherYOLOUltralytics2023] for top-view tracking. Always check tracking quality of the model that you use on your own dataset. YOLOv8 may not be the best fit for all datasets.There has further been work on comparing YOLOv8 for human pose estimation against gold-standard motion tracking systems. For example, @giuliettiRealTimeHumanPose2025 showed that the model we use here `yolov8x-pose-p6.pt` (69.4M parameters) has 18.2 mm mean-per-joint-position error (MPJPE) when incorporated in a calibrated 4-camera system compared to OptiTrack marker-based optical motion capture system (12 cameras). The lowest errors were for the upper body (nose, shoulders, elbows, wrists) which are the keypoints we use in our pipeline. @araujoDetectingCharacterizingGroup2025 performed a similar comparison with Opti-track to assess YOLOv8 (exact model was unspecified), showing that orientation of the torso and head provided a > 70% accuracy at a distance of 300cm-500cm from the camera as compared to the gold-standard. Do note that these satisfactory performance levels are achieved from different conditions than ours as we have a top-view setup and these tests were performed on multi-camera eye-level side and frontal camera setups. There is thus further room to compare performance of YOLOv8 against gold-standard motion tracking systems for these top-view setups.::: {.callout-note .callout-installation collapse="true"}## DIY Step 1.1 Motion Tracking Set-Up and Execution 🛠️### Step 1: Install requirementsFor each script we have provided a `requirements.txt` file. You can install the requirements using pip, by first navigating to the folder where the script is located and then running the following command in your terminal:```bashcd[yourspecificrootadress]/code_STEP1_posetrackingprocessing/pip install -r requirements.txt```### Step 2: Download the YOLOv8 modelIn the current GitHub repo we have a light-weight model `yolov8n-pose.pt` (~6.5MB) for immediate testing of the code. However, this model is not as accurate as the heaviest model.We should download the heaviest model (`yolov8x-pose-p6.pt`) and save it to the `./code_STEP1_posetrackingprocessing/model/` directory: https://github.com/ultralytics/assets/releases/download/v8.1.0/yolov8x-pose-p6.pt**Note** that if you're running the heavyweight model, you will need GPU support for this to run in a reasonable time (we processed our data on a NVIDIA GeForce RTX 4090 GPU). The lightweight model can run on CPU.### Step 3: Run the python scriptAssuming you have videos in your data folder (`./data_raw/`) and you have a `.pt` model in the model folder, you can run the script using the following command:```bashcd[yourspecificrootadress]/code_STEP1_posetrackingprocessing/python yolo_tracking_processing.py```:::The code below is the script `yolo_tracking_processing.py` that you can run to track the videos. The output will be a video with annotated keypoints and a CSV file containing the coordinates of these keypoints. The script processes each video frame-by-frame, detecting people and their pose keypoints (17 points as listed in the script; see the `GetKeypoint class`). We filter out duplicate detections or skeletons with excessive missing data. Specifically, for each processed video, the script generates two output files: an annotated video showing the original footage with skeleton overlays (green points for accepted keypoints, red for filtered ones, and blue lines connecting the points), and a CSV file containing the raw coordinate data (frame number, person ID, keypoint ID, x-coordinate, y-coordinate). The output filenames include parameters like "c150" (150px proximity threshold) and "miss95" (95% missing data tolerance), which control how the system handles potential duplicate detections and incomplete skeletons. The user could adjust the parameter `close_threshold` and `miss_tolerance` to adjust the detection dynamics. We output the annotated video and the CSV file in the `./datatracked_afterSTEP1/` folder.::: {.callout-note .callout-installation collapse="true"}## Pipeline code 1.1 YOLO tracking 🚩```{python, eval=FALSE, code_folding="hide"}from ultralytics import YOLOfrom pydantic import BaseModelimport cv2import csvimport numpy as npimport globimport osimport torch # for gpu supportfrom itertools import combinationsimport systorch.cuda.set_device(0)# Load the modelmodelfolder ='./model/'modellocation = glob.glob(modelfolder+"*.pt")[0]modelfile = os.path.basename(modellocation)print(f"We are loading in the following YOLO model: {modelfile}")model = YOLO(modellocation)# main variablesvideo_folder ="../data_raw/"# avi mp4 or other video formatsvideo_files = glob.glob(video_folder +"*.mp4") + glob.glob(video_folder +"*.avi") + glob.glob(video_folder +"*.mov") + glob.glob(video_folder +"*.mkv")step1resultfolder ="../datatracked_afterSTEP1/"print(video_files)# keypoint namesclass GetKeypoint(BaseModel): NOSE: int=0 LEFT_EYE: int=1 RIGHT_EYE: int=2 LEFT_EAR: int=3 RIGHT_EAR: int=4 LEFT_SHOULDER: int=5 RIGHT_SHOULDER: int=6 LEFT_ELBOW: int=7 RIGHT_ELBOW: int=8 LEFT_WRIST: int=9 RIGHT_WRIST: int=10 LEFT_HIP: int=11 RIGHT_HIP: int=12 LEFT_KNEE: int=13 RIGHT_KNEE: int=14 LEFT_ANKLE: int=15 RIGHT_ANKLE: int=16get_keypoint = GetKeypoint()# Define skeleton connectionsskeleton = [ (get_keypoint.LEFT_SHOULDER, get_keypoint.RIGHT_SHOULDER), (get_keypoint.LEFT_SHOULDER, get_keypoint.LEFT_ELBOW), (get_keypoint.RIGHT_SHOULDER, get_keypoint.RIGHT_ELBOW), (get_keypoint.LEFT_ELBOW, get_keypoint.LEFT_WRIST), (get_keypoint.RIGHT_ELBOW, get_keypoint.RIGHT_WRIST), (get_keypoint.LEFT_SHOULDER, get_keypoint.LEFT_HIP), (get_keypoint.RIGHT_SHOULDER, get_keypoint.RIGHT_HIP), (get_keypoint.LEFT_HIP, get_keypoint.RIGHT_HIP), (get_keypoint.LEFT_HIP, get_keypoint.LEFT_KNEE), (get_keypoint.RIGHT_HIP, get_keypoint.RIGHT_KNEE), (get_keypoint.LEFT_KNEE, get_keypoint.LEFT_ANKLE), (get_keypoint.RIGHT_KNEE, get_keypoint.RIGHT_ANKLE),]def tensor_to_matrix(results_tensor):# this just takes the results output of YOLO and coverts it to a matrix,# making it easier to do quick calculations on the coordinates results_list = results_tensor.tolist() results_matrix = np.matrix(results_list) results_matrix[results_matrix==0] = np.nanreturn results_matrixdef check_for_duplication(results):# this threshold determines how close two skeletons must be in order to be# considered the same person. Arbitrarily chosen for now. close_threshold =150# note that the value is in pixels# missing data tolerance miss_tolerance =0.95# this means we can miss up to 95% of the keypoints drop_indices = []iflen(results[0].keypoints.xy) >1: # only proceed if a skeleton is detected conf_scores = []# get detection confidence for each skeletonfor person in tensor_to_matrix(results[0].keypoints.conf): conf_scores.append(np.mean(person))# this list will stores which comparisons need to be made combos =list(combinations(range(len(results[0].keypoints.xy)), 2))# now loop through these comparisonsfor combo in combos: closeness =abs(np.nanmean(tensor_to_matrix(results[0].keypoints.xy[combo[0]]) - tensor_to_matrix(results[0].keypoints.xy[combo[1]])))# if any of them indicate that two skeletons are very close together,# we keep the one with higher tracking confidence, and remove the otherif closeness < close_threshold: conf_list = [conf_scores[combo[0]], conf_scores[combo[1]]] idx_min = conf_list.index(min(conf_list)) drop_indices.append(combo[idx_min])# additional checks:for person inrange(len(results[0].keypoints.xy)): keypoints_missed = np.isnan(tensor_to_matrix(results[0].keypoints.xy[person])).sum()/2 perc_missed = keypoints_missed/len(tensor_to_matrix(results[0].keypoints.xy[person])) # calculate percentage of missing keypoints# compare % of missing keypoints against the threshold that we set earlierif perc_missed > miss_tolerance: drop_indices.append(person)returnlist(set(drop_indices))for video_path in video_files:# Video path video_path = video_path# only if the output is not there yetif os.path.exists(step1resultfolder+ os.path.basename(video_path).split('.')[0]+"_annotated_layer1_c150_miss95.mp4"):print(f"Output video already exists for {video_path}. Skipping...")continue# vidname without extension vidname = os.path.basename(video_path) vidname = vidname.split('.')[0]# Open the video cap = cv2.VideoCapture(video_path)# Get video properties fps =int(cap.get(cv2.CAP_PROP_FPS)) width =int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height =int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))# Define the output video writer corename = os.path.basename(video_path).split('.')[0] output_path = step1resultfolder+ vidname+"_annotated_layer1_c150_miss95.mp4" fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))# Prepare CSV file csv_path = step1resultfolder+ vidname+'_keypoints_data_layer1.csv' csv_file =open(csv_path, 'w', newline='') csv_writer = csv.writer(csv_file)# Write header header = ['frame', 'person', 'keypoint', 'x', 'y'] csv_writer.writerow(header) frame_count =0while cap.isOpened(): success, frame = cap.read()ifnot success:break# Run YOLOv8 inference on the frame results = model(frame)# write empty rows if no person is detectediflen(results[0].keypoints.xy) ==0: csv_writer.writerow([frame_count, None, None, None, None]) annotated_frame = frame# only do this if a person is detectediflen(results[0].keypoints.xy) >0:# Process the results drop_indices = [] drop_indices = check_for_duplication(results)for person_idx, person_keypoints inenumerate(results[0].keypoints.xy):if person_idx notin drop_indices: colourcode = (0, 255, 0)else: colourcode = (255, 0, 0)for keypoint_idx, keypoint inenumerate(person_keypoints): x, y = keypoint# Write to CSV csv_writer.writerow([frame_count, person_idx, keypoint_idx, x.item(), y.item()])# Draw keypoint on the frame cv2.circle(annotated_frame, (int(x), int(y)), 5, colourcode, -1)# Draw skeletonfor connection in skeleton:if connection[0] <len(person_keypoints) and connection[1] <len(person_keypoints): start_point =tuple(map(int, person_keypoints[connection[0]])) end_point =tuple(map(int, person_keypoints[connection[1]]))ifall(start_point) andall(end_point): # Check if both points are valid cv2.line(annotated_frame, start_point, end_point, (255, 0, 0), 2)# Write the frame to the output video out.write(annotated_frame) frame_count +=1# Release everything cap.release() out.release() cv2.destroyAllWindows() csv_file.close()print(f"Output video saved as {output_path}")print(f"Keypoints data saved as {csv_path}")```:::Here is an example of the output video with annotated keypoints and skeletons. The green points indicate accepted keypoints, while the red points indicate filtered ones. The blue lines connect the keypoints to form a skeleton.<html><video width="600" height="400" controls><source src="./images/videosrerendered/sample.mp4" type="video/mp4"> Your browser does not support the video tag.</video></html>Figure S3. Example output video with annotated keypoints and skeletons. The green points indicate accepted keypoints. The blue lines connect the keypoints to form a skeleton.Here is an example of the output csv file containing the coordinates of the keypoints. The CSV file contains the following columns: frame number, person ID, keypoint ID, x-coordinate, and y-coordinate. Each row corresponds to a detected keypoint in a specific frame.::: {.callout-note .callout-installation collapse="true"}## Example output and code example for YOLO tracking: 📊```{python examplecsvfile}import pandas as pdimport glob as globimport osfolderstep1output ="./dataoutput_STEP1_1_rawposedata/"# Load the CSV filecsv_file = glob.glob(os.path.join(folderstep1output +"*.csv"))[0]df = pd.read_csv(csv_file)# Display the first few rows of the DataFrameprint(df.head())```:::## Step 1.2: Raw pose data post-processing for timeseries matrices (Python)The raw pose data with frame, person, keypoint, and x,y position data that we end up with is not yet in a format that is easy to work with for our purposes. Additionally, sometimes we dont have a person (or multiple persons) detected in the frame. We want to transform these raw pose data to timeseries matrix format, whereby we have time in seconds as one column and different time-varying variables in the other columns. Furthermore, we smooth and interpolate the data if necessary. The below python code executes this second step whereby we store the position data per person, and furthe derive interpersonal distances measures, and kinematic metrics (e.g. speed) and tracking quality indicators (e.g. tracking status that says something about where it concerns interpolated data).Next to determining time in seconds from the frame rate of the videos, the script starts with assigning left and right person IDs based on the horizontal position in the frame (with left being ID 1 and right being ID 2). It then processes the time data to calculate statistics for tracked people using upper body keypoints (as the lower body keypoints are unreliable from top-view and may be ignored). Specifically, we reduce the dimensionality of all upper body keypoints by determined relative positions and their distances: We calculate a bounding box (determined by the outer edges of the keypoints) and determine the center in x and y coordinates (`centroid_x`, `centroid_y`) and their P1-P2 distance of those centroids (`distance`), and the center of mass (`com_x`, `com_y`) for each person and the distnace (`distance_com`). Further we determine the midpoint of the shoulders as another relevant position (`shoulder_midpoint_x`, `shoulder_midpoint_y`) and compute its distance (`distance_shoulder_midpoint`). Additionally, information about body parts such as arm motions by taking the wrist positions (`wrist_left_x`, `wrist_left_y`, `wrist_right_x`, `wrist_right_y`), and further deriving total motion per second, called speed (based on a euclidean sum of the changes of position along x and y). Before calculating our primary measure of interest, which will be the object of our statistical analyses. We need to process the data in two important ways. First, we check for any remaining **missing values** after processing and confirm time continuity to detect any frame drops or temporal misalignments. The script handles missing data through two complementary techniques: linear interpolation for brief gaps in tracking (up to 5 frames) and we take the last known location for longer gaps (up to 20 frames).Second, we need to **smooth the data**. Motion tracking, even when quite accurate, is prone to small fluctuations in position from frame to frame even when the tracked object/person is not moving. In the case of pose tracking of top-down camera angles there is quite some noise related fluctuations in the keypoint estimates. When we calculate something like interpersonal proximity, these fluctuations will cause fluctuations in proximity. Noise especially amplifies to subsequent measures, such as speed, acceleration, and jerk. In the provided pipeline, we use a Savitsky-Golay filter (implemented by `SciPy`[@2020SciPy-NMeth]). The filter slides along the time-series, smoothing the data within a particular window-length by fitting a polynomial of a pre-determined order to the datapoints. In our case, we set a time window of 11 datapoints, and a third degree polynomial. The plot below shows the effect of smoothing the speed data in this way. The script also calculates the speed of the center of mass and wrist positions, and here we also smooth the values using the Savitzky-Golay filter to reduce noise that is amplified during differientation (see @winterBiomechanicsMotorControl2009).We recommend users to check the effect of smoothing on their own data, and adjust the window length and polynomial order to see whether the movement granularity that one is interested in is adequately represented in the time series (the animation code that we provided will help with checking whether one is smoothing too little or too much). ::: {.callout-note .callout-installation collapse="true"}## Example code for checking behavior of smoothing of time series to reduce noise-related jitter 🏴```{python,eval=TRUE, code_folding="hide"}import pandas as pdimport numpy as npimport glob as globimport osimport scipy.signalimport matplotlib.pyplot as pltdef normalize_array(arr):"""Normalize an array to the range [0, 1]"""return (arr - np.min(arr)) / (np.max(arr) - np.min(arr))folderstep12output ="./dataoutput_STEP1_2_timeseries/"# Load the CSV filecsv_file = glob.glob(os.path.join(folderstep12output +"*.csv"))[0]df = pd.read_csv(csv_file)# get raw speed valuesraw_speed = df['wrist_left_p1_speed'].values# smooth the speed timeseriesspeed = scipy.signal.savgol_filter(raw_speed, window_length=11, polyorder=3)# comparison smoothingextra_speed = scipy.signal.savgol_filter(raw_speed, window_length=21, polyorder=3)plt.plot(normalize_array(raw_speed[0:150]))plt.plot(normalize_array(speed[0:150]))plt.plot(normalize_array(extra_speed[0:150]))plt.title("Effect of smoothing on speed data using Savitsky-Golay filter")plt.xlabel("Frames")plt.legend(labels=["raw speed","smoothed speed (window 11)", "extra smoothed speed (window 21)"])#plt.show()# save the plotplt.savefig("images/smoothing_speed_effect.png")plt.close()```:::The more sophisticated measure that we have created, which is the object of our later analyses, is the proximity calculation measure (see function `calculate_approach()` in the step2 script). This function establishes initial reference position for both participants and then projects their subsequent movements onto the connecting line between them. This allows for quantifying approach and retreat behaviors for each person seperately (which a distance measure does not allow for) while allowing for relative angle changes between them - positive values indicate movement toward the other person (relative from start), while negative values indicate withdrawal (relative from start).Below is an animation to exemplify what we are measuring. We are here capturing only movements that are changing the distance between them. When someone moves in a circle around the other person we want to make sure we do not register that as approach or retreat (which we would do if we just capture horizontal position for example). However we also want capture each person their own approach/avoidance value (which would not be possible when using a simple distance measure). By always measuring the signed length of the projected line from the initial position (based on the current connected line), the method tracks cumulative approach/avoidance for each person over time. ::: {.callout-note .callout-installation collapse="true"}## Code for creating animation to exemplify behavior of proximity measure 🏴```{python}import numpy as npimport matplotlib.pyplot as pltfrom matplotlib.animation import FuncAnimationimport matplotlib.patches as patchesimport matplotlib.lines as mlinesimport osimport matplotlib# Force matplotlib to use Agg backend to avoid display issuesmatplotlib.use('Agg')# Create output directory if it doesn't existos.makedirs("images/proximity_visualization", exist_ok=True)# Simulation parametersnum_frames =100p1_color ='#3498db'# Bluep2_color ='#e74c3c'# Redvector_color ='#2ecc71'# Green# Create initial positions for two people - now on a horizontal line# Person 1 on the left, Person 2 on the right (same y-coordinate)p1_initial = np.array([3, 5])p2_initial = np.array([7, 5])# Generate movement paths with MORE EXTREME approach and avoidance# Person 1 will make a dramatic approach followed by retreat# Person 2 will make a clear retreat followed by approacht = np.linspace(0, 2*np.pi, num_frames)# Create more dramatic movement path for Person 1# Increase the amplitude of movement to make it more extremep1_path_x = p1_initial[0] +2.5* np.sin(t/2) # Larger horizontal movement (was 1.5)p1_path_y = p1_initial[1] +2.5* np.sin(t) # Larger vertical movement (was 2.0)# Create more dramatic movement path for Person 2# Make the retreat phase more obviousp2_path_x = p2_initial[0] -1.2* np.sin(t) # More horizontal movement (was 0.5)p2_path_y = p2_initial[1] -2.0* np.sin(t*1.5) # More vertical movement (was 1.0)# Store positions for each framep1_positions = np.column_stack((p1_path_x, p1_path_y))p2_positions = np.column_stack((p2_path_x, p2_path_y))# Arrays to store approach valuesp1_approach_values = np.zeros(num_frames)p2_approach_values = np.zeros(num_frames)# Calculate proximity approach valuesdef calculate_approach(positions1, positions2):"""Calculate approach values similar to the script's approach"""# Use first frame as reference reference_p1_pos = positions1[0].copy() reference_p2_pos = positions2[0].copy() p1_approach = np.zeros(len(positions1)) p2_approach = np.zeros(len(positions2))for i inrange(len(positions1)):# Current positions p1_pos = positions1[i] p2_pos = positions2[i]# Current connecting vector and direction current_connect = p2_pos - p1_pos current_distance = np.linalg.norm(current_connect)if current_distance >0: current_direction = current_connect / current_distance# Vectors from reference positions p1_vector = p1_pos - reference_p1_pos p2_vector = p2_pos - reference_p2_pos# Project onto connecting line p1_approach[i] = np.dot(p1_vector, current_direction)# For P2, a positive value should mean movement toward P1# So we use the negative connecting direction (from P2 to P1)# And a positive dot product means approaching p2_direction =-current_direction # Direction from P2 to P1 p2_approach[i] = np.dot(p2_vector, p2_direction)return p1_approach, p2_approach# Calculate approach valuesp1_approach_values, p2_approach_values = calculate_approach(p1_positions, p2_positions)# Create the main visualizationdef create_detailed_animation():# Set up the figure explicitly with DPI fig, ax = plt.subplots(figsize=(12, 10), dpi=100) # Larger figure for better visibilitydef init(): ax.clear() ax.set_xlim(0, 10) ax.set_ylim(0, 10) ax.set_aspect('equal')return []def animate(i): ax.clear()# Set up the axes ax.set_xlim(0, 10) ax.set_ylim(0, 10) ax.set_aspect('equal') ax.set_title('Proximity Approach Visualization (Extreme Movements)', fontsize=16) ax.set_xlabel('X Position', fontsize=12) ax.set_ylabel('Y Position', fontsize=12)# Get current positions p1_pos = p1_positions[i] p2_pos = p2_positions[i]# Draw the initial connecting vector (reference) ax.plot([p1_positions[0][0], p2_positions[0][0]], [p1_positions[0][1], p2_positions[0][1]], color=vector_color, linewidth=1, linestyle='--', alpha=0.5, zorder=0)# Draw the current connection vector ax.plot([p1_pos[0], p2_pos[0]], [p1_pos[1], p2_pos[1]], color=vector_color, linewidth=2.5, linestyle='-', zorder=1)# Draw person markers ax.scatter(p1_pos[0], p1_pos[1], s=180, color=p1_color, edgecolor='black', linewidth=1.5, zorder=3) ax.scatter(p2_pos[0], p2_pos[1], s=180, color=p2_color, edgecolor='black', linewidth=1.5, zorder=3)# Draw initial positions ax.scatter(p1_positions[0][0], p1_positions[0][1], s=90, color=p1_color, alpha=0.5, edgecolor='black', linewidth=1, zorder=2) ax.scatter(p2_positions[0][0], p2_positions[0][1], s=90, color=p2_color, alpha=0.5, edgecolor='black', linewidth=1, zorder=2)# Draw trails (last 15 positions) start_idx =max(0, i-15) ax.plot(p1_positions[start_idx:i+1, 0], p1_positions[start_idx:i+1, 1], '-', color=p1_color, alpha=0.5, linewidth=2) ax.plot(p2_positions[start_idx:i+1, 0], p2_positions[start_idx:i+1, 1], '-', color=p2_color, alpha=0.5, linewidth=2)# Visualize the approach values with text - make them larger and more visible approach_text = (f"P1 approach: {p1_approach_values[i]:.2f}\n"f"P2 approach: {p2_approach_values[i]:.2f}") ax.text(0.05, 0.95, approach_text, transform=ax.transAxes, verticalalignment='top', fontsize=14, fontweight='bold', bbox=dict(boxstyle='round', facecolor='white', alpha=0.9))# Add projection visualizationif i >0:# Get current connecting direction connect_vector = p2_pos - p1_pos connect_distance = np.linalg.norm(connect_vector) connect_direction = connect_vector / connect_distance# Person 1 projection p1_vector = p1_pos - p1_positions[0] p1_projection = np.dot(p1_vector, connect_direction) * connect_direction# Draw projection line for P1 p1_proj_point = p1_positions[0] + p1_projection# Person 2 projection - use direction from P2 to P1 p2_vector = p2_pos - p2_positions[0] p2_direction =-connect_direction # Direction from P2 to P1 p2_projection = np.dot(p2_vector, p2_direction) * p2_direction# Draw projection line for P2 p2_proj_point = p2_positions[0] + p2_projection# Draw projection lines with arrows to show direction# For P1 p1_approach = p1_approach_values[i]ifabs(p1_approach) >0.1: # Only draw if there's a significant approach value# Draw line with increased width ax.plot([p1_positions[0][0], p1_proj_point[0]], [p1_positions[0][1], p1_proj_point[1]], '--', color=p1_color, linewidth=2.5)# Add arrow to show direction - make arrow larger arrow_length =min(abs(p1_approach) *0.2, 0.6) # Scale arrow size arrow_width =0.15if p1_approach >0: # Approaching# Arrow pointing from initial position toward projection point ax.arrow(p1_positions[0][0], p1_positions[0][1], arrow_length * connect_direction[0], arrow_length * connect_direction[1], head_width=arrow_width, head_length=arrow_width*1.5, fc=p1_color, ec='black', linewidth=1, zorder=4)else: # Retreating# Arrow pointing from initial position away from projection point ax.arrow(p1_positions[0][0], p1_positions[0][1], -arrow_length * connect_direction[0], -arrow_length * connect_direction[1], head_width=arrow_width, head_length=arrow_width*1.5, fc=p1_color, ec='black', linewidth=1, zorder=4)# Add value label near the projection line mid_x = (p1_positions[0][0] + p1_proj_point[0]) /2 mid_y = (p1_positions[0][1] + p1_proj_point[1]) /2 ax.text(mid_x, mid_y, f"{p1_approach:.2f}", color=p1_color, fontsize=10, fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, boxstyle='round,pad=0.2'))# For P2 p2_approach = p2_approach_values[i]ifabs(p2_approach) >0.1: # Only draw if there's a significant approach value# Draw line with increased width ax.plot([p2_positions[0][0], p2_proj_point[0]], [p2_positions[0][1], p2_proj_point[1]], '--', color=p2_color, linewidth=2.5)# Add arrow to show direction - make arrow larger arrow_length =min(abs(p2_approach) *0.2, 0.6) # Scale arrow size arrow_width =0.15if p2_approach >0: # Approaching# Arrow pointing from initial position toward P1 ax.arrow(p2_positions[0][0], p2_positions[0][1], arrow_length * p2_direction[0], arrow_length * p2_direction[1], head_width=arrow_width, head_length=arrow_width*1.5, fc=p2_color, ec='black', linewidth=1, zorder=4)else: # Retreating# Arrow pointing from initial position away from P1 ax.arrow(p2_positions[0][0], p2_positions[0][1], -arrow_length * p2_direction[0], -arrow_length * p2_direction[1], head_width=arrow_width, head_length=arrow_width*1.5, fc=p2_color, ec='black', linewidth=1, zorder=4)# Add value label near the projection line mid_x = (p2_positions[0][0] + p2_proj_point[0]) /2 mid_y = (p2_positions[0][1] + p2_proj_point[1]) /2 ax.text(mid_x, mid_y, f"{p2_approach:.2f}", color=p2_color, fontsize=10, fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, boxstyle='round,pad=0.2'))# Add small markers at projection points ax.scatter(p1_proj_point[0], p1_proj_point[1], color=p1_color, s=70, marker='x') ax.scatter(p2_proj_point[0], p2_proj_point[1], color=p2_color, s=70, marker='x')# Draw horizontal dotted line at initial y-coordinate to emphasize horizontal start ax.axhline(y=p1_initial[1], color='gray', linestyle=':', alpha=0.5)# Add legend legend_elements = [ patches.Patch(facecolor=p1_color, edgecolor='black', label='Person 1'), patches.Patch(facecolor=p2_color, edgecolor='black', label='Person 2'), patches.Patch(facecolor=vector_color, edgecolor='black', label='Connection Vector'), mlines.Line2D([], [], color='gray', linestyle=':', label='Initial Horizontal Line'), mlines.Line2D([], [], color=p1_color, linestyle='--', linewidth=2, label='Projection Length = Approach Value') ] ax.legend(handles=legend_elements, loc='upper right', fontsize=10)# Add a frame counter ax.text(0.95, 0.05, f"Frame: {i}/{num_frames-1}", transform=ax.transAxes, horizontalalignment='right', verticalalignment='bottom', fontsize=10, bbox=dict(boxstyle='round', facecolor='white', alpha=0.5))# Add description text description = ("Proximity Calculation Visualization\n""- Large dots: Current positions\n""- Small dots: Initial positions (horizontal alignment)\n""- Green line: Current connecting vector\n""- Dashed lines: Projection of movement (length = approach value)\n""- Arrows: Direction of approach/retreat\n""- Positive values: Movement toward other person\n""- Negative values: Movement away from other person\n""- X markers: Projected positions" ) ax.text(0.05, 0.05, description, transform=ax.transAxes, verticalalignment='bottom', fontsize=9, bbox=dict(boxstyle='round', facecolor='white', alpha=0.7))return []# Create the animation with explicit FPS and DPI ani = FuncAnimation(fig, animate, frames=num_frames, init_func=init, blit=True, interval=100)# Save the animation as a GIF with explicit DPI ani.save('images/proximity_visualization/proximity_calculation_extreme.gif', writer='pillow', fps=10, dpi=100) plt.close(fig)# Run the animationtry:print("Creating animation...") create_detailed_animation()print("GIF created successfully in the 'images/proximity_visualization' folder.")exceptExceptionas e:print(f"Error creating animation: {e}")# Alternative approach if the above fails - create static framesprint("Trying alternative approach with static frames...")try:# Create a static image sequence instead os.makedirs("images/proximity_visualization/frames", exist_ok=True)# Create 10 static frames instead of full animation sample_frames = np.linspace(0, num_frames-1, 10, dtype=int)for idx, i inenumerate(sample_frames): fig, ax = plt.subplots(figsize=(10, 8), dpi=100)# Set up the axes ax.set_xlim(0, 10) ax.set_ylim(0, 10) ax.set_title(f'Proximity Calculation (Frame {i})', fontsize=14)# Get current positions p1_pos = p1_positions[i] p2_pos = p2_positions[i]# Draw the connection vector ax.plot([p1_pos[0], p2_pos[0]], [p1_pos[1], p2_pos[1]], color=vector_color, linewidth=2)# Draw person markers ax.scatter(p1_pos[0], p1_pos[1], s=150, color=p1_color, label='Person 1') ax.scatter(p2_pos[0], p2_pos[1], s=150, color=p2_color, label='Person 2')# Draw initial positions ax.scatter(p1_positions[0][0], p1_positions[0][1], s=50, color=p1_color, alpha=0.5) ax.scatter(p2_positions[0][0], p2_positions[0][1], s=50, color=p2_color, alpha=0.5)# Draw horizontal dotted line at initial y-coordinate ax.axhline(y=p1_initial[1], color='gray', linestyle=':', alpha=0.5)# Show approach values ax.text(0.5, 0.95, f"Person 1 approach: {p1_approach_values[i]:.2f}", transform=ax.transAxes, ha='center', va='top', bbox=dict(boxstyle='round', facecolor=p1_color, alpha=0.3)) ax.text(0.5, 0.9, f"Person 2 approach: {p2_approach_values[i]:.2f}", transform=ax.transAxes, ha='center', va='top', bbox=dict(boxstyle='round', facecolor=p2_color, alpha=0.3)) ax.legend()# Save the frame plt.tight_layout() plt.savefig(f'images/proximity_visualization/frames/frame_{idx:02d}.png') plt.close(fig)print("Static frames created successfully in 'images/proximity_visualization/frames' folder.")exceptExceptionas e:print(f"Alternative approach also failed: {e}")```:::The output of this script is a timeseries matrix for each video, which contains the time in seconds and the calculated variables. The timeseries matrices are saved in the `./dataoutput_STEP1_2_timeseries/` folder. The code below is the script `STEP2_process_timeseries.py` that you can run to process the raw pose data.::: {.callout-note .callout-installation collapse="true"}## Pipeline code step 1.2 for processing to timeseries data 🚩```{python, eval=FALSE, code_folding="hide"}import pandas as pdimport numpy as npimport globimport osimport cv2import mathfrom bisect import bisect_leftfrom scipy.signal import savgol_filter# Path DefinitionsINPUT_LAYER1_PATH ='../dataoutput_STEP1_1_rawposedata/'# Input directory containing tracked keypoint data from STEP1VIDEO_PATH ="../data_raw/"# Raw video files directoryOUTPUT_PATH ='../dataoutput_STEP1_2_timeseries/'# Output directory for processed timeseries data# Variable Explanations:# =====================================================# Positional Variables:# - x, y: 2D coordinates in the image frame# - x_min, x_max, y_min, y_max: Bounding box coordinates for a person# - centroid_x, centroid_y: Center point of a person's bounding box# - com_x, com_y: Center of mass for a person (average of upper body keypoint positions)# - shoulder_midpoint_*: Midpoint between left and right shoulders for each person (p1, p2)# - wrist_left_*, wrist_right_*: Positions of left and right wrists for each person (p1, p2)## Distance Variables:# - distance: Euclidean distance between the centroids of both people# - distance_com: Euclidean distance between centers of mass of both people# - distance_shoulder_midpoint: Distance between shoulder midpoints of both people# - *_speed: Euclidean speed of different body parts (calculated from position differences)# - *_speed_smooth: Smoothed version of speed using Savitzky-Golay filter## Movement Analysis:# - p1_com_approach_pos, p2_com_approach_pos: Position of each person's COM relative to their# initial position, projected onto the connecting line between people. Positive values # indicate movement toward the other person.## Tracking Status:# - both_tracked: Boolean indicating if both people are tracked in the current frame# - single_tracked: Boolean indicating if only one person is tracked in the current frame# =====================================================def frame_to_time(ts, fps):"""Convert frame numbers to time in seconds""" ts["time"] = [row[1]["frame"] / fps for row in ts.iterrows()]return tsdef take_closest(myList, myNumber):""" Assumes myList is sorted. Returns closest value to myNumber. If two numbers are equally close, return the smallest number. """ pos = bisect_left(myList, myNumber)if pos ==0:return myList[0]if pos ==len(myList):return myList[-1] before = myList[pos -1] after = myList[pos]if after - myNumber < myNumber - before:return afterelse:return beforedef assign_left_right(ts):"""Assign person IDs (0 or 1) based on x-position in the frame""" min_x = np.min([val for val in ts['x'] if val !=0]) max_x = np.max([val for val in ts['x'] if val !=0])for index, row in ts.iterrows():if row['x'] ==0: # Skip untracked pointscontinueif take_closest([min_x, max_x], row['x']) == min_x: ts.loc[index, 'person'] =0# Left personelse: ts.loc[index, 'person'] =1# Right personreturn tsdef process_time_data(ts):"""Calculate statistics for tracked people using upper body keypoints""" ts_upper = ts[ts['keypoint'] <11] # Filter to upper body keypoints only# Calculate stats per person and time stats = ts_upper.groupby(['time', 'person']).agg({'x': ['min', 'max', 'mean'],'y': ['min', 'max', 'mean'] }).reset_index() stats.columns = ['time', 'person', 'x_min', 'x_max', 'com_x', 'y_min', 'y_max', 'com_y'] stats['centroid_x'] = (stats['x_min'] + stats['x_max']) /2 stats['centroid_y'] = (stats['y_min'] + stats['y_max']) /2return statsdef process_people_data_vectorized(ts, stats):""" Process keypoint data to calculate relative positions and distances. Preserves time alignment and handles COM/centroid assignment. """# Get all unique times from the original data all_times =sorted(ts['time'].unique()) result = pd.DataFrame(index=all_times) result.index.name ='time'# Process shoulders first (keypoints 5 and 6) shoulders = ts[ts['keypoint'].isin([5, 6])].groupby(['time', 'person']).agg({'x': 'mean','y': 'mean' }).reset_index()# Process wrists (keypoints 7 and 8) wrists = ts[ts['keypoint'].isin([7, 8])].pivot_table( index=['time', 'person'], columns='keypoint', values=['x', 'y'] ).reset_index() wrists.columns = ['time', 'person', 'left_x', 'right_x', 'left_y', 'right_y']# Get person-specific data p0_stats = stats[stats['person'] ==0].set_index('time') p1_stats = stats[stats['person'] ==1].set_index('time')# Process distances where both people exist common_times =sorted(set(p0_stats.index) &set(p1_stats.index))if common_times: # Only calculate if we have common times result.loc[common_times, 'distance'] = np.sqrt( (p0_stats.loc[common_times, 'centroid_x'] - p1_stats.loc[common_times, 'centroid_x'])**2+ (p0_stats.loc[common_times, 'centroid_y'] - p1_stats.loc[common_times, 'centroid_y'])**2 ) result.loc[common_times, 'distance_com'] = np.sqrt( (p0_stats.loc[common_times, 'com_x'] - p1_stats.loc[common_times, 'com_x'])**2+ (p0_stats.loc[common_times, 'com_y'] - p1_stats.loc[common_times, 'com_y'])**2 )# Process shoulders per personfor person, prefix in [(0, 'p1'), (1, 'p2')]: s_person = shoulders[shoulders['person'] == person].set_index('time') result[f'shoulder_midpoint_{prefix}_x'] = s_person['x'] result[f'shoulder_midpoint_{prefix}_y'] = s_person['y']# Calculate shoulder midpoint distance where possible shoulder_cols = ['shoulder_midpoint_p1_x', 'shoulder_midpoint_p2_x','shoulder_midpoint_p1_y', 'shoulder_midpoint_p2_y'] shoulder_mask = result[shoulder_cols].notna().all(axis=1)if shoulder_mask.any(): result.loc[shoulder_mask, 'distance_shoulder_midpoint'] = np.sqrt( (result.loc[shoulder_mask, 'shoulder_midpoint_p1_x'] - result.loc[shoulder_mask, 'shoulder_midpoint_p2_x'])**2+ (result.loc[shoulder_mask, 'shoulder_midpoint_p1_y'] - result.loc[shoulder_mask, 'shoulder_midpoint_p2_y'])**2 )# Process wrists per person and add COM/centroidfor person, prefix in [(0, 'p1'), (1, 'p2')]: w_person = wrists[wrists['person'] == person].set_index('time') result[f'wrist_left_{prefix}_x'] = w_person['left_x'] result[f'wrist_left_{prefix}_y'] = w_person['left_y'] result[f'wrist_right_{prefix}_x'] = w_person['right_x'] result[f'wrist_right_{prefix}_y'] = w_person['right_y']# Use the correct stats object based on person ID person_stats = p0_stats if person ==0else p1_stats# Add com and centroid with the correct stats object result[f'com_{prefix}_x'] = person_stats['com_x'] result[f'com_{prefix}_y'] = person_stats['com_y'] result[f'centroid_{prefix}_x'] = person_stats['centroid_x'] result[f'centroid_{prefix}_y'] = person_stats['centroid_y']# Add tracking quality indicators using original data person_counts = ts.groupby('time')['person'].nunique() result['both_tracked'] = result.index.map(lambda x: person_counts.get(x, 0) ==2) result['single_tracked'] = result.index.map(lambda x: person_counts.get(x, 0) ==1)return resultdef calculate_proximity_approach(timeseries_data):""" Calculate each person's position relative to their initial position, projecting movement onto the current connecting line between people. This handles rotation and changing spatial relationships between people. Positive values indicate movement toward the other person from initial position. The reference positions are only established when both people are detected. """# Create columns for our measurementsif'p1_com_approach_pos'notin timeseries_data.columns: timeseries_data['p1_com_approach_pos'] = np.nanif'p2_com_approach_pos'notin timeseries_data.columns: timeseries_data['p2_com_approach_pos'] = np.nan# Sort data by time sorted_data = timeseries_data.sort_values('time')# Find first valid frame to establish reference positions reference_p1_pos =None reference_p2_pos =None reference_distance =None# First pass: find reference frame where both people are detectedfor idx, row in sorted_data.iterrows():# Skip rows with NaN values or where both_tracked is Falseif (np.isnan(row['com_p1_x']) or np.isnan(row['com_p1_y']) or np.isnan(row['com_p2_x']) or np.isnan(row['com_p2_y']) or ('both_tracked'in row.index and row['both_tracked'] ==False)):continue# Get positions for this frame p1_pos = np.array([row['com_p1_x'], row['com_p1_y']]) p2_pos = np.array([row['com_p2_x'], row['com_p2_y']])# Calculate connecting vector connect_vector = p2_pos - p1_pos distance = np.linalg.norm(connect_vector)if distance >0:# We found a valid reference frame reference_p1_pos = p1_pos.copy() reference_p2_pos = p2_pos.copy() reference_distance = distanceprint(f"Reference frame established at time={row['time']}")print(f" Reference p1_pos: {reference_p1_pos}")print(f" Reference p2_pos: {reference_p2_pos}")print(f" Reference distance: {reference_distance}")breakif reference_p1_pos isNone:print("ERROR: Could not establish a reference frame. No valid frames found with both people detected.")return timeseries_data# Second pass: calculate projected positions for all framesfor idx, row in sorted_data.iterrows():# Skip rows with NaN valuesif (np.isnan(row['com_p1_x']) or np.isnan(row['com_p1_y']) or np.isnan(row['com_p2_x']) or np.isnan(row['com_p2_y'])):continue# Get current positions p1_pos = np.array([row['com_p1_x'], row['com_p1_y']]) p2_pos = np.array([row['com_p2_x'], row['com_p2_y']])# Calculate current connecting vector and direction current_connect = p2_pos - p1_pos current_distance = np.linalg.norm(current_connect)# Skip frames where people are at the same positionif current_distance ==0:continue current_direction = current_connect / current_distance# Calculate vector from reference position to current position p1_vector = p1_pos - reference_p1_pos p2_vector = p2_pos - reference_p2_pos# Project these vectors onto the current connecting line# Positive values mean moving toward the other person p1_projection = np.dot(p1_vector, current_direction) p2_projection =-np.dot(p2_vector, -current_direction)# Store values timeseries_data.loc[idx, 'p1_com_approach_pos'] = p1_projection timeseries_data.loc[idx, 'p2_com_approach_pos'] = p2_projection# Verify results filled_p1 = timeseries_data['p1_com_approach_pos'].notna().sum() filled_p2 = timeseries_data['p2_com_approach_pos'].notna().sum()print(f"Frames with filled values - p1: {filled_p1}, p2: {filled_p2}")return timeseries_datadef main():"""Main processing function"""# Create output directory if it doesn't exist os.makedirs(OUTPUT_PATH, exist_ok=True)# Get all CSV files from layer 1 processing layer1_files = glob.glob(INPUT_LAYER1_PATH +'*.csv')# Process each filefor file_path in layer1_files:# Extract video name from file path vid_name = os.path.basename(file_path).split('_keypoints_data_layer1.csv')[0]print(f"Processing video: {vid_name}")# Check if already processed output_file =f"{OUTPUT_PATH}/{vid_name}_processed_data_layer2.csv"if os.path.exists(output_file):print("Already processed, skipping...")continue# Determine video format and get FPS video_path =Nonefor ext in ['.avi', '.mp4', '.mov']:if os.path.exists(f"{VIDEO_PATH}/{vid_name}{ext}"): video_path =f"{VIDEO_PATH}/{vid_name}{ext}"breakifnot video_path:print(f"Video file not found for {vid_name}")continue cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) cap.release()print(f"Working on: {vid_name} with fps = {fps}")# Load and preprocess data ts = pd.read_csv(file_path) ts = frame_to_time(ts, fps=fps) ts = assign_left_right(ts)# Calculate statistics stats = process_time_data(ts)# Process tracked data processed_data = process_people_data_vectorized(ts, stats)# Create final data frame bb_data = pd.merge( stats, processed_data.reset_index(), on='time', how='outer' )# Extract time series data (using person 0 as reference) timeseries_data = bb_data[bb_data['person'] ==0].copy() # Make a copy to avoid SettingWithCopyWarning# Remove unnecessary columns timeseries_data = timeseries_data.drop(columns='person')# Calculate desired time step based on FPS desired_time_step =1/fps# Interpolate missing values nan_cols = timeseries_data.columns[timeseries_data.isna().any()].tolist() timeseries_data[nan_cols] = timeseries_data[nan_cols].interpolate()# Smooth distance with Savitzky-Golay filter timeseries_data['distance_smooth'] = savgol_filter(timeseries_data['distance'].values, 11, 3)# Calculate and smooth wrist speedsfor wrist in ['wrist_left_p1', 'wrist_right_p1', 'wrist_left_p2', 'wrist_right_p2']:# Calculate speed as Euclidean distance between consecutive positions timeseries_data[f'{wrist}_speed'] = np.sqrt( timeseries_data[f'{wrist}_x'].diff()**2+ timeseries_data[f'{wrist}_y'].diff()**2 )# Apply Savitzky-Golay filter for smoothing timeseries_data[f'{wrist}_speed_smooth'] = savgol_filter( timeseries_data[f'{wrist}_speed'].values, 11, 3 )# Fill NaN values before calculating proximity approach timeseries_data = timeseries_data.fillna(method='ffill')# Calculate proximity approach timeseries_data = calculate_proximity_approach(timeseries_data)# Fill any remaining NaN values timeseries_data = timeseries_data.fillna(method='ffill')# Smooth proximity approach values timeseries_data['p1_com_approach_pos'] = savgol_filter(timeseries_data['p1_com_approach_pos'].values, 11, 3) timeseries_data['p2_com_approach_pos'] = savgol_filter(timeseries_data['p2_com_approach_pos'].values, 11, 3)# Check for any remaining NaN valuesprint("Checking for NaN values...") nan_counts = timeseries_data.isna().sum()if nan_counts.sum() >0:print("Found NaN values after processing:")print(nan_counts)else:print("No NaN values found.")# Save processed data timeseries_data.to_csv(output_file, index=False)# Check for time continuityprint("Checking time continuity...") time_diffs = np.array([round(val, 2) for val in np.diff(timeseries_data['time'])])ifnot np.all(time_diffs == desired_time_step):print(f"Found gaps at times: {np.where(time_diffs != desired_time_step)[0]}")else:print("No missing time points.")if__name__=="__main__": main()```:::To provide an example of the data we plot here a timeseries of the resultant data.::: {.callout-note .callout-installation collapse="true"}## Example output timeseries: 📊```{python examplecsvfilelayer12}import pandas as pdimport glob as globimport osfolderstep12output ="./dataoutput_STEP1_2_timeseries/"# Load the CSV filecsv_file = glob.glob(os.path.join(folderstep12output +"*.csv"))[0]df = pd.read_csv(csv_file)# Display the first few rows of the DataFrameprint(df.head())```:::## Step 1.3: Animation with original videodataWe advise users to double check the quality of the tracking data and the derived variables, both to verify quality as well as to get a better intuitive and qualitative understanding of their quantitative methods. We have argued elsewhere that especially when working with derived measurements of high-dimensional data that researchers can train to become better in their intuitive grasp for their measurements by relating it real-time with the raw video data [@miaoDIMSDashboardExploring2025]. In this way we become better in our understanding of what the measure is and is not responding to; which, as we mention in the introduction to the companion paper, means becoming observant rather than a disinterested observer [@hackingRepresentingInterveningIntroductory1983]. The animation below is generated using the code `STEP3_animation.py` which couples video with quantiative data, allowing for qualitatively checking the timeseries real time against what is happening in the video. The user can select variables need to be plotted in the video plus time series animation. The code below provides the animation. Please check the DIY section for how to set up the motion tracking and run the code.::: {.callout-note .callout-installation collapse="true"}## Code chunk: Animation code for coupling video with quantitative data 🚩 ```{python, eval=FALSE, code_folding="hide"}import pandas as pdimport numpy as npimport globimport osimport cv2import mathfrom bisect import bisect_leftfrom scipy.signal import savgol_filterimport matplotlib.pyplot as pltimport seaborn as snsimport tqdmimport tempfilefrom moviepy import VideoFileClip# Path DefinitionsINPUT_LAYER1_PATH ='../dataoutput_STEP1_2_timeseries/'# Input directory containing tracked keypoint data from STEP1VIDEO_PATH ="../dataoutput_STEP1_1_rawposedata/"# Raw video files directoryOUTPUT_PATH ='../dataoutput_STEP1_3_animations/'# Output directory for processed timeseries datatargetvideo ="*sample_annotated_layer1_c150_miss95.mp4"# note that sample video must be set to True to process only a sample videoSAMPLE_VIDEO =True# Set to True to process only a sample video, False to process all videos# Create output directory if it doesn't existos.makedirs(OUTPUT_PATH, exist_ok=True)# animate only sample video?if SAMPLE_VIDEO:# For sample video, use a specific video file allvidsnew = glob.glob(VIDEO_PATH +"*"+ targetvideo)else: allvidsnew = glob.glob(VIDEO_PATH +"*.mp4")print(f"Found {len(allvidsnew)} videos to process")# what variables to animate with videoanimate_variables = {'x_min': False,'x_max': False,'com_x': False,'y_min': False,'y_max': False,'com_y': False,'centroid_x': False,'centroid_y': False,'distance': False,'distance_com': False,'shoulder_midpoint_p1_x': False,'shoulder_midpoint_p1_y': False,'shoulder_midpoint_p2_x': False,'shoulder_midpoint_p2_y': False,'distance_shoulder_midpoint': True,'wrist_left_p1_x': False,'wrist_left_p1_y': False,'wrist_right_p1_x': False,'wrist_right_p1_y': False,'com_p1_x': False,'com_p1_y': False,'centroid_p1_x': False,'centroid_p1_y': False,'wrist_left_p2_x': False,'wrist_left_p2_y': False,'wrist_right_p2_x': False,'wrist_right_p2_y': False,'com_p2_x': False,'com_p2_y': False,'centroid_p2_x': False,'centroid_p2_y': False,'distance_smooth': True,'wrist_left_p1_speed': False,'wrist_left_p1_speed_smooth': True,'wrist_right_p1_speed': False,'wrist_right_p1_speed_smooth': True,'wrist_left_p2_speed': False,'wrist_left_p2_speed_smooth': True,'wrist_right_p2_speed': False,'wrist_right_p2_speed_smooth': True,'p1_com_approach_pos': True,'p2_com_approach_pos': True}# plotting functionsdef plot_timeseries(timeseries_data, current_time=None, figsize=(15, 8)):""" Visualizing of the processed 1_2 motion variable data together with the original video. Groups variables by modality with consistent coloring for p1/p2 and left/right. Parameters: ----------- timeseries_data : pandas.DataFrame DataFrame containing all the motion analysis columns current_time : float, optional current time in seconds to mark with vertical line figsize : tuple, optional Figure size in inches (width, height) """# Filter data for only variables marked as True in animate_variables active_vars = [var for var, active in animate_variables.items() if active and var in timeseries_data.columns]ifnot active_vars:print("No variables selected for animation!")returnNone# Define consistent color scheme# Colors for p1/p2 (blue family for p1, red family for p2) p1_colors = {'left': '#1f77b4', 'right': '#aec7e8'} # Dark blue, light blue p2_colors = {'left': '#d62728', 'right': '#ff9896'} # Dark red, light red other_colors = ['#ff7f0e', '#2ca02c', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf']# Group variables by modality distance_vars = [var for var in active_vars if'distance'in var] position_vars = [var for var in active_vars ifany(x in var for x in ['_x', '_y', 'approach_pos']) and'distance'notin var] speed_vars = [var for var in active_vars if'speed'in var]# Calculate number of subplots needed n_plots =sum([len(distance_vars) >0, len(position_vars) >0, len(speed_vars) >0])if n_plots ==0:returnNone# Create figure with subplots fig, axes = plt.subplots(n_plots, 1, figsize=figsize, squeeze=False) axes = axes.flatten() plot_idx =0# Helper function to get consistent color and styledef get_style(var_name):# Determine person (p1/p2)if'_p1_'in var_name: person ='p1'elif'_p2_'in var_name: person ='p2'else: person ='other'# Determine side (left/right)if'left'in var_name: side ='left'elif'right'in var_name: side ='right'else: side ='other'# Get colorif person =='p1': color = p1_colors.get(side, p1_colors['left'])elif person =='p2': color = p2_colors.get(side, p2_colors['left'])else: color = other_colors[0] # Use first color for non-person variables# Get linestyle (solid for left, dashed for right)if side =='right': linestyle ='--'else: linestyle ='-'# Get alpha (lower for raw data, full for smoothed)if'smooth'in var_name: alpha =1.0elifany(x in var_name for x in ['speed', 'velocity']) and'smooth'notin var_name: alpha =0.3else: alpha =1.0return color, linestyle, alpha# 1. Distance Plotsif distance_vars: ax = axes[plot_idx] color_idx =0for var in distance_vars:ifany(x in var for x in ['_p1_', '_p2_']): color, linestyle, alpha = get_style(var)else: color = other_colors[color_idx %len(other_colors)] linestyle ='-' alpha =1.0 color_idx +=1 ax.plot(timeseries_data['time'], timeseries_data[var], label=var.replace('_', ' '), linewidth=2, color=color, linestyle=linestyle, alpha=alpha) ax.set_title('Distances Over Time') ax.set_ylabel('Distance') ax.grid(True, alpha=0.3) ax.legend()if'distance_shoulder_midpoint'in distance_vars or'distance'in distance_vars: ax.invert_yaxis() plot_idx +=1# 2. Position Plots (group p1/p2 and left/right together)if position_vars: ax = axes[plot_idx]for var in position_vars: color, linestyle, alpha = get_style(var) ax.plot(timeseries_data['time'], timeseries_data[var], label=var.replace('_', ' '), linewidth=2, color=color, linestyle=linestyle, alpha=alpha) ax.set_title('Positions Over Time') ax.set_ylabel('Position') ax.grid(True, alpha=0.3) ax.legend() plot_idx +=1# 3. Speed Plots (group p1/p2 and left/right together)if speed_vars: ax = axes[plot_idx]for var in speed_vars: color, linestyle, alpha = get_style(var) ax.plot(timeseries_data['time'], timeseries_data[var], label=var.replace('_', ' '), linewidth=2, color=color, linestyle=linestyle, alpha=alpha) ax.set_title('Speeds Over Time') ax.set_ylabel('Speed') ax.grid(True, alpha=0.3) ax.legend() plot_idx +=1# Add vertical line for current time if specifiedif current_time isnotNone:for ax in axes[:plot_idx]: ax.axvline(x=current_time, color='red', linestyle='--', alpha=0.7, linewidth=2)# Set common x-label axes[plot_idx-1].set_xlabel('Time (seconds)')# Adjust layout plt.tight_layout()return fig# Create animation for each videofor vids in allvidsnew: vidname = os.path.basename(vids)# remove substring "_annotated_layer1" lab ="_annotated_layer1" vidname = vidname.replace(lab, "") vidname = vidname[:-4]# also remove substring "_c150_miss95" vidname = vidname.replace("_c150_miss95", "")print(f"\nProcessing video: {vidname}")# Check if CSV file exists csv_path = os.path.join(INPUT_LAYER1_PATH, f'{vidname}_processed_data_layer2.csv')ifnot os.path.exists(csv_path):print(f"CSV file not found: {csv_path}")continue# Load the CSV file timeseries_data = pd.read_csv(csv_path)# Output paths temp_output = os.path.join(OUTPUT_PATH, f'{vidname}_distance_layer2_temp.mp4') final_output = os.path.join(OUTPUT_PATH, f'{vidname}_distance_layer2.mp4')# if already exists, skipif os.path.exists(final_output):print("Already processed, skipping...")continue# load the video file in opencv cap = cv2.VideoCapture(vids)ifnot cap.isOpened():print(f"Error opening video: {vids}")continue# Get video properties fps =int(cap.get(cv2.CAP_PROP_FPS)) width =int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height =int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames =int(cap.get(cv2.CAP_PROP_FRAME_COUNT))print(f"Video properties: {width}x{height}, {fps} FPS, {total_frames} frames")# Calculate time step based on FPS time_step =1.0/ fpsprint(f"Time step: {time_step:.4f} seconds per frame")# Define the output video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height))# Create temporary directory for plotswith tempfile.TemporaryDirectory() as temp_dir:print("Creating animated video...")# loop over the frames with tqdm progress bar current_time =0.0# Start at time 0for frame_idx in tqdm.tqdm(range(total_frames), desc=f"Processing {vidname}"):# read the frame success, frame = cap.read()ifnot success:break# plot the timeseries with current time fig = plot_timeseries(timeseries_data, current_time)if fig isNone:print("No plot generated, skipping frame") out.write(frame) current_time += time_step # Increment by time stepcontinue# save the plot to a temp file plot_path = os.path.join(temp_dir, f'plot_{frame_idx:06d}.png') fig.savefig(plot_path, dpi=100, bbox_inches='tight') plt.close(fig) # Important: close figure to free memory# Read the plot image plot_img = cv2.imread(plot_path)if plot_img isNone:print(f"Error reading plot image: {plot_path}") out.write(frame) current_time += time_step # Increment by time stepcontinue# Calculate overlay area (bottom third of the frame) slice_start =2* (height //3) slice_height = height - slice_start# Resize plot to fit overlay area plot_img_resized = cv2.resize(plot_img, (width, slice_height))# Create overlay on the frame frame_copy = frame.copy() frame_copy[slice_start:slice_start + slice_height, :, :] = plot_img_resized# write the frame to the output video out.write(frame_copy) current_time += time_step # Increment by time step (seconds)# Release everything cap.release() out.release()print(f"Temporary video saved, now re-encoding with MoviePy...")# Re-encode with MoviePy for better compatibilitytry: clip = VideoFileClip(temp_output) clip.write_videofile(final_output, codec='libx264', audio_codec='aac') clip.close()# Remove temporary fileif os.path.exists(temp_output): os.remove(temp_output)print(f"Final output video saved as {final_output}")exceptExceptionas e:print(f"Error during MoviePy re-encoding: {e}")print(f"Temporary video available at: {temp_output}")print("\nAll videos processed!")```:::# Step 2: Summary feature extraction: Smoothness and other featuresWe have motivated in the main paper accompanying this notebook that smoothness can be a useful feature to understand the quality of interactions. Our pipeline focuses on the extraction of this specific feature, while we also show how easy it is to generate other features from the time series data that are processed after step 1.We take **two approaches** to quantify smoothness of the movement data. The first approach is based on the spectral arc length metric, which has been shown to be a stable measure of smoothness for prolonged (quasi-cyclical) time series data [@balasubramanianAnalysisMovementSmoothness2015]. The approach is based on analyzing the frequency spectrum, specifically the changes in energy over the different frequencies, for the lower frequencies (< 10Hz; but this can be set by the user). Specifally it calculates the spectral arc length (distance of two points along a curve) from Fourier magnitude spectrum of the speed data (or in our case changes in the distance or proximity positions). This code is directly copied from the [original repository](https://github.com/siva82kb/smoothness) by the lead developer of the metric, Sivak Balasubramanian. We refer to the [@balasubramanianAnalysisMovementSmoothness2015] for further details and validation of this novel measure of smoothness. As can be seen, the higher values of the spectral arc length indicate smoother movements, while lower values indicate more jerky movements.::: {.callout-note .callout-installation collapse="true"}## Pipeline code step 2, function for calculating spectral arc length 🚩 ```{python, code_folding="hide"}import numpy as np# directly copied from the original repository https://github.com/siva82kb/smoothness/blob/master/python/smoothness.pydef spectral_arclength(movement, fs, padlevel=4, fc=10.0, amp_th=0.05):""" Calcualtes the smoothness of the given speed profile using the modified spectral arc length metric. Parameters ---------- movement : np.array The array containing the movement speed profile. fs : float The sampling frequency of the data. padlevel : integer, optional Indicates the amount of zero padding to be done to the movement data for estimating the spectral arc length. [default = 4] fc : float, optional The max. cut off frequency for calculating the spectral arc length metric. [default = 10.] amp_th : float, optional The amplitude threshold to used for determing the cut off frequency upto which the spectral arc length is to be estimated. [default = 0.05] Returns ------- sal : float The spectral arc length estimate of the given movement's smoothness. (f, Mf) : tuple of two np.arrays This is the frequency(f) and the magntiude spectrum(Mf) of the given movement data. This spectral is from 0. to fs/2. (f_sel, Mf_sel) : tuple of two np.arrays This is the portion of the spectrum that is selected for calculating the spectral arc length. Notes ----- This is the modfieid spectral arc length metric, which has been tested only for discrete movements. It is suitable for movements that are a few seconds long, but for long movements it might be slow and results might not make sense (like any other smoothness metric). Examples -------- >>> t = np.arange(-1, 1, 0.01) >>> move = np.exp(-5*pow(t, 2)) >>> sal, _, _ = spectral_arclength(move, fs=100.) >>> '%.5f' % sal '-1.41403' """# Number of zeros to be padded. nfft =int(pow(2, np.ceil(np.log2(len(movement))) + padlevel))# Frequency f = np.arange(0, fs, fs/nfft)# Normalized magnitude spectrum Mf =abs(np.fft.fft(movement, nfft)) Mf = Mf/max(Mf)# Indices to choose only the spectrum within the given cut off frequency Fc.# NOTE: This is a low pass filtering operation to get rid of high frequency# noise from affecting the next step (amplitude threshold based cut off for# arc length calculation). fc_inx = ((f <= fc)*1).nonzero() f_sel = f[fc_inx] Mf_sel = Mf[fc_inx]# Choose the amplitude threshold based cut off frequency.# Index of the last point on the magnitude spectrum that is greater than# or equal to the amplitude threshold. inx = ((Mf_sel >= amp_th)*1).nonzero()[0] fc_inx =range(inx[0], inx[-1]+1) f_sel = f_sel[fc_inx] Mf_sel = Mf_sel[fc_inx]# Calculate arc length new_sal =-sum(np.sqrt(pow(np.diff(f_sel)/(f_sel[-1] - f_sel[0]), 2) +pow(np.diff(Mf_sel), 2)))return new_sal, (f, Mf), (f_sel, Mf_sel)```:::The second approach is based on the log dimensionless squared jerk metric, which is the traditional measure of smoothness in biomechanics [@hoganSensitivitySmoothnessMeasures2009]. Jerk is the third derivative of our proximity time-series. We therefore take the *change* in proximity from timepoint to timepoint, divided by the time interval. Doing this once, on the proximity values, gets us the first derivative: speed. The derivative of speed is calculated the same way, giving us *acceleration*, or how fast the speed is changing from timepoint to timepoint. The derivative of acceleration is then *jerk*. We utilize jerk here because of its theoretical importance in human movement sciences, for example in modeling complex biomechanical movements as well as interpersonal coordination. While integrating over the jerk timeseries has been a popular measure of smoothness, it is affected by the amplitude and duration of movements, which can make comparisons between different movements challenging. A solution is to calculate dimensionless (squared) jerk. The calculation results in a single (individual-level), positive value. The mathematical underpinnings of this calculation are outside of the scope of this m. However, what is important to note here is that the measure (dimensionless jerk) was based on theoretical reasoning relating to our primary research questions. Below the function that calculates the dimensionless squared jerk metric from position data. This measure is based on taking the third derivative with respect to time (jerk), squared and integrated over the movement duration, and then normalized by the amplitude of the movement. The log of this value is taken keep the values in a reasonable range. ::: {.callout-note .callout-installation collapse="true"}## Pipeline code step 2, function for calculating dimensionless squared jerk 🚩 ```{python, eval=TRUE, code_folding="hide"}import pandas as pdimport osimport numpy as npfrom scipy.signal import savgol_filterfrom scipy import integrateimport matplotlib.pyplot as pltimport seaborn as snsdef dimensionless_squared_jerk_from_position(ts, time):""" Calculate the dimensionless squared jerk metric from position data. Parameters: ----------- ts : array_like Position data points, should be a 1D numpy array time : array_like Time points corresponding to the ts data, should be a 1D numpy array Returns: -------- float Dimensionless squared jerk metric or NaN if calculation fails """try:# First check the raw inputs ts = np.array(ts, dtype=float) time = np.array(time, dtype=float)print(f"Original shape - ts: {ts.shape}, time: {time.shape}")print(f"NaN count before processing - ts: {np.isnan(ts).sum()}, time: {np.isnan(time).sum()}")# Input validation before preprocessingiflen(ts) !=len(time):print(f"Error: Arrays must have the same length. ts: {len(ts)}, time: {len(time)}")return np.naniflen(ts) <11: # Minimum length for savgol filterprint(f"Error: Input arrays too short for savgol window size=11. Length: {len(ts)}")return np.nan# Check if time has NaNs - we need to fix time firstif np.isnan(time).any():print("Warning: Time array contains NaNs, filling with linear sequence") valid_time_mask =~np.isnan(time)ifnot valid_time_mask.any():print("Error: All time values are NaN")return np.nan# Create a proper time sequence time = np.linspace(np.nanmin(time), np.nanmax(time), len(time))# Check if input data is all NaNif np.isnan(ts).all():print("Error: All input values are NaN")return np.nan# Identify valid data points for interpolation valid_mask =~np.isnan(ts) valid_indices = np.where(valid_mask)[0]iflen(valid_indices) <2:print(f"Error: Not enough valid data points for interpolation. Found {len(valid_indices)}")return np.nan# Handle the interpolation more carefully# 1. Use valid points to interpolateif np.isnan(ts).any():try: # Create interpolator with valid points only valid_time = time[valid_mask] valid_data = ts[valid_mask]# Sort by time to ensure proper interpolation sort_idx = np.argsort(valid_time) valid_time = valid_time[sort_idx] valid_data = valid_data[sort_idx]# Create interpolation function f = interpolate.interp1d( valid_time, valid_data, bounds_error=False, fill_value=(valid_data[0], valid_data[-1]) # Extrapolate with edge values )# Apply interpolation ts_interpolated = f(time)# Check if interpolation fixed all NaNsif np.isnan(ts_interpolated).any():print(f"Warning: Interpolation failed to fix all NaNs. Remaining: {np.isnan(ts_interpolated).sum()}")# Last resort: replace any remaining NaNs with nearest valid value ts_interpolated = pd.Series(ts_interpolated).fillna(method='ffill').fillna(method='bfill').values ts = ts_interpolatedexceptExceptionas e:print(f"Error during interpolation: {str(e)}")return np.nan# Verify no NaNs remain after preprocessingif np.isnan(ts).any() or np.isnan(time).any():print("Error: Failed to remove all NaN values")return np.nan# Ensure time steps are uniform for derivative calculation uniform_time = np.linspace(time[0], time[-1], len(time))ifnot np.allclose(time, uniform_time):# If time is not uniform, resample the dataprint("Warning: Time steps not uniform, resampling data")# Use scipy's interpolation again to resample f = interpolate.interp1d(time, ts, bounds_error=False, fill_value='extrapolate') ts = f(uniform_time) time = uniform_time# Calculate the time step dt = np.diff(time)[0]if dt <=0:print(f"Error: Time steps must be positive. Got dt={dt}")return np.nan# Print state after preprocessingprint(f"Data after preprocessing - Length: {len(ts)}")print(f"NaN check after preprocessing - ts: {np.isnan(ts).any()}, time: {np.isnan(time).any()}")print(f"Range - ts: {np.min(ts)} to {np.max(ts)}, time: {np.min(time)} to {np.max(time)}")# Calculate speed (exactly as in original) speed = np.gradient(ts, dt) sns.lineplot(data=speed)# Smooth savgol filter (maintaining original settings) speed = savgol_filter(speed, 11, 3)# Calculate acceleration (exactly as in original) acceleration = np.gradient(speed, dt)# Smooth (maintaining original settings) acceleration = savgol_filter(acceleration, 11, 3) sns.lineplot(data=acceleration)# Calculate jerk (exactly as in original) jerk = np.gradient(acceleration, dt) sns.lineplot(data=jerk)# Smooth (maintaining original settings) jerk = savgol_filter(jerk, 11, 3)# Calculate movement duration (D) movement_duration = time[-1] - time[0]if movement_duration <=0:print(f"Error: Movement duration must be positive. Got {movement_duration}")return np.nan# Calculate movement amplitude by integrating speed position = integrate.simpson(speed, x=time) movement_amplitude =abs(position) # Use absolute value to ensure positive# Prevent division by zero epsilon =1e-10if movement_amplitude < epsilon:print(f"Warning: Movement amplitude very small ({movement_amplitude}). Using epsilon.") movement_amplitude = epsilon# Calculate the squared jerk squared_jerk = jerk **2# Integrate the squared jerk integrated_squared_jerk = integrate.simpson(squared_jerk, x=time)# Ensure positive value for integral of squared jerkif integrated_squared_jerk <0:print(f"Warning: Negative integral of squared jerk: {integrated_squared_jerk}. Using absolute value.") integrated_squared_jerk =abs(integrated_squared_jerk)# Calculate the dimensionless squared jerk dimensionless_jerk = integrated_squared_jerk * (movement_duration**5/ movement_amplitude**2)return dimensionless_jerkexceptExceptionas e:print(f"Error calculating dimensionless squared jerk: {str(e)}")return np.nan```:::## Example smoothness results on simulated time series dataIn the below example we show how the two smoothness metrics behave for three different time series data. The first time series is a smooth sinusoidal signal, the second is a noisy version of the first, and the third is a more noisy version of the second. The spectral arc length metric should be higher for the smooth signal, while the dimensionless squared jerk metric should be lower for the smooth signal (as it is less jerky).::: {.callout-note .callout-installation collapse="true"}## Example code for checking behavior of the smoothness metrics 🏴```{python, code_folding="hide"}import matplotlib.pyplot as pltimport numpy as np# Generate the time series datats1 = np.arange(5, 10, 0.01)ts2 = np.arange(5, 10, 0.01) + np.random.normal(0, 0.1, len(ts1))ts3 = np.arange(5, 10, 0.01) + np.random.normal(0, 0.5, len(ts1))# Create sinusoidal time seriestime_axis = np.arange(5, 10, 0.01)ts1 = np.sin(ts1)ts2 = np.sin(ts2)ts3 = np.sin(ts3)# Calculate metrics (assuming you have these functions defined)spects1 = spectral_arclength(ts1, fs=1)spects2 = spectral_arclength(ts2, fs=1)spects3 = spectral_arclength(ts3, fs=1)jerkts1 = dimensionless_squared_jerk_from_position(ts1, time_axis)jerkts2 = dimensionless_squared_jerk_from_position(ts2, time_axis)jerkts3 = dimensionless_squared_jerk_from_position(ts3, time_axis)# Create figure with subplots - 1x3 layoutfig, axes = plt.subplots(1, 3, figsize=(18, 9))fig.suptitle('Time Series Analysis: Spectral Arc Length and Dimensionless Squared Jerk', fontsize=16, fontweight='bold', y=0.98)# Colors for consistent stylingcolors = ['#1f77b4', '#ff7f0e', '#d62728'] # Blue, Orange, Red# Plot 1: Individual signals separated (left)ax1 = axes[0]offset =3# Vertical separation between signalsax1.plot(time_axis, ts1 +2*offset, color=colors[0], linewidth=2, label='Smooth Signal')ax1.plot(time_axis, ts2 + offset, color=colors[1], linewidth=2, label='Low Noise Signal')ax1.plot(time_axis, ts3, color=colors[2], linewidth=2, label='High Noise Signal')ax1.set_title('Separated Time Series', fontweight='bold', fontsize=12)ax1.set_xlabel('Time')ax1.set_ylabel('Amplitude (Offset)')ax1.legend(fontsize=10)ax1.grid(True, alpha=0.3)# Plot 2: Metrics comparison - Spectral Arc Length (middle)ax2 = axes[1]signals = ['Smooth\nSignal', 'Low Noise\nSignal', 'High Noise\nSignal']spect_values = [spects1[0], spects2[0], spects3[0]]bars1 = ax2.bar(signals, spect_values, color=colors, alpha=0.7, edgecolor='black', linewidth=1)ax2.set_title('Spectral Arc Length Comparison', fontweight='bold', fontsize=12)ax2.set_ylabel('Spectral Arc Length')ax2.grid(True, alpha=0.3, axis='y')# Add value labels on barsfor bar, value inzip(bars1, spect_values): height = bar.get_height() ax2.text(bar.get_x() + bar.get_width()/2., height +0.01,f'{value:.3f}', ha='center', va='bottom', fontweight='bold', fontsize=10)# Plot 3: Metrics comparison - Dimensionless Squared Jerk (right)ax3 = axes[2]jerk_values = [jerkts1, jerkts2, jerkts3]bars2 = ax3.bar(signals, jerk_values, color=colors, alpha=0.7, edgecolor='black', linewidth=1)ax3.set_title('Dimensionless Squared Jerk Comparison', fontweight='bold', fontsize=12)ax3.set_ylabel('Dimensionless Squared Jerk')ax3.grid(True, alpha=0.3, axis='y')# Add value labels on barsfor bar, value inzip(bars2, jerk_values): height = bar.get_height() ax3.text(bar.get_x() + bar.get_width()/2., height +max(jerk_values)*0.02,f'{value:.3f}', ha='center', va='bottom', fontweight='bold', fontsize=10)# Adjust layout to prevent overlapplt.tight_layout(rect=[0, 0.12, 1, 0.95])#plt.show()# save plotplt.savefig('./images/smoothness_metrics_comparison.png', dpi=300, bbox_inches='tight')```:::In the below pipeline code for step 2, we calculate smoothness for the distance between the two hands, as well as the distance to the center of mass (COM) and centroid of the two hands. We also calculate the smoothness of the wrist speed profiles, which are derived from the distance time series data. When SPARC is in the variable name, it refers to the spectral arc length metric, while smoothness will refer to the dimensionless squared jerk metric.::: {.callout-note .callout-installation collapse="true"}## Pipeline code step 2 complete procedure 🚩```{python, eval=FALSE, code_folding="hide"}import matplotlib.pyplot as pltimport seaborn as snsimport tempfilefrom scipy import integratefrom scipy import interpolatefrom scipy.signal import savgol_filterimport numpy as npimport pandas as pdimport osimport glob# specifically we might be interested in computing the smoothness of the distanceinputfol ='../dataoutput_STEP1_2_timeseries/'outputfol ='../dataoutput_STEP2_features/'metadata = pd.read_csv('../meta/project_pointsibling_metadata_starttimes.csv', encoding='latin1')constantwindowsize_sec =100# we want an equal portion of each timeseries to be analyzed# function to calculate spectral arc lengthdef spectral_arclength(movement, fs, padlevel=4, fc=10.0, amp_th=0.05):""" Calcualtes the smoothness of the given speed profile using the modified spectral arc length metric. Parameters ---------- movement : np.array The array containing the movement speed profile. fs : float The sampling frequency of the data. padlevel : integer, optional Indicates the amount of zero padding to be done to the movement data for estimating the spectral arc length. [default = 4] fc : float, optional The max. cut off frequency for calculating the spectral arc length metric. [default = 10.] amp_th : float, optional The amplitude threshold to used for determing the cut off frequency upto which the spectral arc length is to be estimated. [default = 0.05] Returns ------- sal : float The spectral arc length estimate of the given movement's smoothness. (f, Mf) : tuple of two np.arrays This is the frequency(f) and the magntiude spectrum(Mf) of the given movement data. This spectral is from 0. to fs/2. (f_sel, Mf_sel) : tuple of two np.arrays This is the portion of the spectrum that is selected for calculating the spectral arc length. Notes ----- This is the modfieid spectral arc length metric, which has been tested only for discrete movements. It is suitable for movements that are a few seconds long, but for long movements it might be slow and results might not make sense (like any other smoothness metric). Examples -------- >>> t = np.arange(-1, 1, 0.01) >>> move = np.exp(-5*pow(t, 2)) >>> sal, _, _ = spectral_arclength(move, fs=100.) >>> '%.5f' % sal '-1.41403' """# Number of zeros to be padded. nfft =int(pow(2, np.ceil(np.log2(len(movement))) + padlevel))# Frequency f = np.arange(0, fs, fs/nfft)# Normalized magnitude spectrum Mf =abs(np.fft.fft(movement, nfft)) Mf = Mf/max(Mf)# Indices to choose only the spectrum within the given cut off frequency Fc.# NOTE: This is a low pass filtering operation to get rid of high frequency# noise from affecting the next step (amplitude threshold based cut off for# arc length calculation). fc_inx = ((f <= fc)*1).nonzero() f_sel = f[fc_inx] Mf_sel = Mf[fc_inx]# Choose the amplitude threshold based cut off frequency.# Index of the last point on the magnitude spectrum that is greater than# or equal to the amplitude threshold. inx = ((Mf_sel >= amp_th)*1).nonzero()[0] fc_inx =range(inx[0], inx[-1]+1) f_sel = f_sel[fc_inx] Mf_sel = Mf_sel[fc_inx]# Calculate arc length new_sal =-sum(np.sqrt(pow(np.diff(f_sel)/(f_sel[-1] - f_sel[0]), 2) +pow(np.diff(Mf_sel), 2)))return new_sal, (f, Mf), (f_sel, Mf_sel)# function to calculate the dimensionless squared jerk metric from position datadef dimensionless_squared_jerk_from_position(ts, time):""" Calculate the dimensionless squared jerk metric from position data. Parameters: ----------- ts : array_like Position data points, should be a 1D numpy array time : array_like Time points corresponding to the ts data, should be a 1D numpy array Returns: -------- float Dimensionless squared jerk metric or NaN if calculation fails """try:# First check the raw inputs ts = np.array(ts, dtype=float) time = np.array(time, dtype=float)print(f"Original shape - ts: {ts.shape}, time: {time.shape}")print(f"NaN count before processing - ts: {np.isnan(ts).sum()}, time: {np.isnan(time).sum()}")# Input validation before preprocessingiflen(ts) !=len(time):print(f"Error: Arrays must have the same length. ts: {len(ts)}, time: {len(time)}")return np.naniflen(ts) <11: # Minimum length for savgol filterprint(f"Error: Input arrays too short for savgol window size=11. Length: {len(ts)}")return np.nan# Check if time has NaNs - we need to fix time firstif np.isnan(time).any():print("Warning: Time array contains NaNs, filling with linear sequence") valid_time_mask =~np.isnan(time)ifnot valid_time_mask.any():print("Error: All time values are NaN")return np.nan# Create a proper time sequence time = np.linspace(np.nanmin(time), np.nanmax(time), len(time))# Check if input data is all NaNif np.isnan(ts).all():print("Error: All input values are NaN")return np.nan# Identify valid data points for interpolation valid_mask =~np.isnan(ts) valid_indices = np.where(valid_mask)[0]iflen(valid_indices) <2:print(f"Error: Not enough valid data points for interpolation. Found {len(valid_indices)}")return np.nan# Handle the interpolation more carefully# 1. Use valid points to interpolateif np.isnan(ts).any():try: # Create interpolator with valid points only valid_time = time[valid_mask] valid_data = ts[valid_mask]# Sort by time to ensure proper interpolation sort_idx = np.argsort(valid_time) valid_time = valid_time[sort_idx] valid_data = valid_data[sort_idx]# Create interpolation function f = interpolate.interp1d( valid_time, valid_data, bounds_error=False, fill_value=(valid_data[0], valid_data[-1]) # Extrapolate with edge values )# Apply interpolation ts_interpolated = f(time)# Check if interpolation fixed all NaNsif np.isnan(ts_interpolated).any():print(f"Warning: Interpolation failed to fix all NaNs. Remaining: {np.isnan(ts_interpolated).sum()}")# Last resort: replace any remaining NaNs with nearest valid value ts_interpolated = pd.Series(ts_interpolated).fillna(method='ffill').fillna(method='bfill').values ts = ts_interpolatedexceptExceptionas e:print(f"Error during interpolation: {str(e)}")return np.nan# Verify no NaNs remain after preprocessingif np.isnan(ts).any() or np.isnan(time).any():print("Error: Failed to remove all NaN values")return np.nan# Ensure time steps are uniform for derivative calculation uniform_time = np.linspace(time[0], time[-1], len(time))ifnot np.allclose(time, uniform_time):# If time is not uniform, resample the dataprint("Warning: Time steps not uniform, resampling data")# Use scipy's interpolation again to resample f = interpolate.interp1d(time, ts, bounds_error=False, fill_value='extrapolate') ts = f(uniform_time) time = uniform_time# Calculate the time step dt = np.diff(time)[0]if dt <=0:print(f"Error: Time steps must be positive. Got dt={dt}")return np.nan# Print state after preprocessingprint(f"Data after preprocessing - Length: {len(ts)}")print(f"NaN check after preprocessing - ts: {np.isnan(ts).any()}, time: {np.isnan(time).any()}")print(f"Range - ts: {np.min(ts)} to {np.max(ts)}, time: {np.min(time)} to {np.max(time)}")# Calculate speed (exactly as in original) speed = np.gradient(ts, dt)# Smooth savgol filter (maintaining original settings) speed = savgol_filter(speed, 11, 3)# Calculate acceleration (exactly as in original) acceleration = np.gradient(speed, dt)# Smooth (maintaining original settings) acceleration = savgol_filter(acceleration, 11, 3)# Calculate jerk (exactly as in original) jerk = np.gradient(acceleration, dt)# Smooth (maintaining original settings) jerk = savgol_filter(jerk, 11, 3)# Calculate movement duration (D) movement_duration = time[-1] - time[0]if movement_duration <=0:print(f"Error: Movement duration must be positive. Got {movement_duration}")return np.nan# Calculate movement amplitude by integrating speed position = integrate.simpson(speed, x=time) movement_amplitude =abs(position) # Use absolute value to ensure positive# Prevent division by zero epsilon =1e-10if movement_amplitude < epsilon:print(f"Warning: Movement amplitude very small ({movement_amplitude}). Using epsilon.") movement_amplitude = epsilon# Calculate the squared jerk squared_jerk = jerk **2# Integrate the squared jerk integrated_squared_jerk = integrate.simpson(squared_jerk, x=time)# Ensure positive value for integral of squared jerkif integrated_squared_jerk <0:print(f"Warning: Negative integral of squared jerk: {integrated_squared_jerk}. Using absolute value.") integrated_squared_jerk =abs(integrated_squared_jerk)# Calculate the dimensionless squared jerk dimensionless_jerk = integrated_squared_jerk * (movement_duration**5/ movement_amplitude**2)# Final sanity checkif np.isnan(dimensionless_jerk) or np.isinf(dimensionless_jerk):print(f"Warning: Result is {dimensionless_jerk}. Details: ")print(f" Movement duration: {movement_duration}")print(f" Movement amplitude: {movement_amplitude}")print(f" Integrated squared jerk: {integrated_squared_jerk}")return np.nan# log the resultprint(f"Dimensionless squared jerk: {dimensionless_jerk}")# log the dimensionless jerk dimensionless_jerk = np.log(dimensionless_jerk)return dimensionless_jerkexceptExceptionas e:print(f"Error calculating dimensionless squared jerk: {str(e)}")return np.nan# df for smoothness datenewdfcolumns = ['videoID','timeadjusted', 'originalsamples','adjustedsamples', 'start_time_analysiswindow', 'end_time_analysiswindow', 'perc_twopersonsdetected', 'average_com_movementp1', 'average_com_movementp2', 'smoothness_distancecom', 'SPARC_smoothness_distancecom', 'smoothness_distancecentroid', 'smoothness_xy_average_com_p1', 'smoothness_xy_average_com_p2', 'smoothness_xy_average_centroid_p1', 'smoothness_xy_average_centroid_p2', 'smoothness_p1_proximity', 'smoothness_p2_proximity']newdf = pd.DataFrame(columns=newdfcolumns)# check for each csv file for layer2 datalayer2dat = glob.glob(inputfol +'*.csv')#print(layer2dat)# loop over the csv layer 2 datafor vids in layer2dat:print(vids)# Load the CSV timeseries file ts = pd.read_csv(vids)# get the features videoID = os.path.basename(vids).split('_processed_data_layer2.csv')[0] originalsamples =len(ts["time"]) perc_twopersonsdetected = ts['both_tracked'].sum() /len(ts)# check metadata start start = metadata[metadata['VIDEO_ID'] == videoID]['start'].valuesprint("start time of this video: "+str(start))# calculate the average movement of the com for each person average_com_movementp1 = np.mean(np.sqrt((ts['com_p1_x'].diff()**2+ ts['com_p1_y'].diff()**2))) average_com_movementp2 = np.mean(np.sqrt((ts['com_p2_x'].diff()**2+ ts['com_p2_y'].diff()**2)))# add a time adjusted variable to the datasetif start.size >0: # check if endtime not greater than the last time in the datasetif (start[0] + constantwindowsize_sec) > ts['time'].max():print("End time is greater than the last time in the dataset, setting end time to max value") timeadjusted ="TRUE - Adjusted to start at + "+str(start[0]) +"With a window length of: "+str(constantwindowsize_sec) +" seconds"# Take a timeseries chunk of 150 seconds ts = ts[(ts['time'] >= start[0]) & (ts['time'] <= start[0] + constantwindowsize_sec)]if start.size ==0: timeadjusted ="FALSE - Not adjusted to start time as no start time was given for this video, window length is still set to: "+str(constantwindowsize_sec) +" seconds" ts = ts[(ts['time'] <= constantwindowsize_sec)] adjustedsamples =len(ts["time"])print(timeadjusted) # fps is mode timestaps per second fps =1/(ts['time'].diff().mode()[0])# add time start and time end start_time_analysiswindow = start[0] if start.size >0else0 end_time_analysiswindow = start_time_analysiswindow + constantwindowsize_sec# calculate the smoothness of the distance between the com and centroid smoothness_distancecom = dimensionless_squared_jerk_from_position(ts['distance_com'].values, ts['time'].values)# take the derivative of the distance distancecomspeed = np.gradient(ts['distance_com'].values, ts['time'].values) SPARCsmoothness_distancecom = spectral_arclength(distancecomspeed, 1/fps, padlevel=4, fc=10.0, amp_th=0.05)# it is the first value of the distancecomspeed SPARCsmoothness_distancecom = SPARCsmoothness_distancecom[0]#print(smoothness_distancecom) smoothness_distancecentroid = dimensionless_squared_jerk_from_position(ts['distance'].values, ts['time'].values)# calculate the smoothness of the xy positions for each person smoothness_xy_average_com_p1 = (dimensionless_squared_jerk_from_position(ts['com_p1_x'],ts['time'].values)+dimensionless_squared_jerk_from_position(ts['com_p1_y'],ts['time'].values))/2 smoothness_xy_average_com_p2 = (dimensionless_squared_jerk_from_position(ts['com_p2_x'],ts['time'].values)+dimensionless_squared_jerk_from_position(ts['com_p2_y'],ts['time'].values))/2 smoothness_xy_average_centroid_p1 = (dimensionless_squared_jerk_from_position(ts['centroid_p1_x'],ts['time'].values)+dimensionless_squared_jerk_from_position(ts['centroid_p1_y'],ts['time'].values))/2 smoothness_xy_average_centroid_p2 = (dimensionless_squared_jerk_from_position(ts['centroid_p2_x'],ts['time'].values)+dimensionless_squared_jerk_from_position(ts['centroid_p2_y'],ts['time'].values))/2# calculate the smoothness of the proximity approach smoothness_p1_proximity = dimensionless_squared_jerk_from_position(ts['p1_com_approach_pos'].values, ts['time'].values) smoothness_p2_proximity = dimensionless_squared_jerk_from_position(ts['p2_com_approach_pos'].values, ts['time'].values)# append to the new df using concat newdf = pd.concat([newdf, pd.DataFrame([[videoID, timeadjusted, originalsamples, adjustedsamples, start_time_analysiswindow, end_time_analysiswindow, perc_twopersonsdetected, average_com_movementp1, average_com_movementp2, smoothness_distancecom, SPARCsmoothness_distancecom, smoothness_distancecentroid, smoothness_xy_average_com_p1, smoothness_xy_average_com_p2, smoothness_xy_average_centroid_p1, smoothness_xy_average_centroid_p2, smoothness_p1_proximity, smoothness_p2_proximity]], columns=newdfcolumns)], ignore_index=True)# save the new dfnewdf.to_csv(outputfol +'smoothness_data.csv', index=False, encoding='latin1')newdf.head()# print doneprint("Done with smoothness processing pipeline!")```:::::: {.callout-note .callout-installation collapse="true"}## Example output smoothness: 📊For each video we will now have a smoothness feature set. ```{python examplecsvfilelayer20}import pandas as pdimport glob as globimport os# Load the CSV filecsv_file ="./dataoutput_STEP2_features/smoothness_data.csv"df = pd.read_csv(csv_file)# Display the first few rows of the DataFrameprint(df.head())```:::### Extract features for all videos using tsfreshTo show how easy it is to extract features from the time series data, we use the `tsfresh` library [@christTimeSeriesFeatuRe2018]. This library contains a large number of features designed to characterize time series data. While we do not use them in our analysis, it provides a convenient example of how easy it is to extract features from the created time series in a data-driven way. The user can of course change this code to add more features or to extract only a subset of features.::: {.callout-note .callout-installation collapse="true"}## Extracting more features example tsfresh 🏴```{python, code_folding="hide"}import pandas as pdimport globimport osfrom tsfresh import extract_features# Initialize list to store features from each fileall_features = []timeseries = glob.glob('./dataoutput_STEP1_2_timeseries/*_processed_data_layer2.csv')# for checking lets do the first 2 time seriestimeseries = timeseries[:2] # comment this line to process all files# Process each file individuallyforfilein timeseries: ts_data = pd.read_csv(file) video_id =file.split('/')[-1].split('_processed')[0]# Clean NaN values for the current file distance_data = ts_data['distance_com'].copy() distance_data = distance_data.interpolate(method='linear') distance_data = distance_data.fillna(method='ffill') distance_data = distance_data.fillna(method='bfill')# Create DataFrame for current file only current_data = pd.DataFrame({'id': video_id,'time': range(len(distance_data)),'distance_com': distance_data })# Extract features for current file only current_features = extract_features(current_data, column_id='id', column_sort='time')# Add to our collection all_features.append(current_features)# Optional: print progressprint(f"Processed {video_id} - extracted {len(current_features.columns)} features")# Clear variables to free memorydel ts_data, distance_data, current_data, current_features# Combine all features at the endcombined_features = pd.concat(all_features, ignore_index=True)print(f"Total: Extracted {len(combined_features.columns)} features from {len(combined_features)} videos")combined_features.to_csv('./dataoutput_STEP2_features/tsfresh_features_fromCOMdistance.csv')```:::::: {.callout-note .callout-installation collapse="true"}## Example output tsfresh: 📊For each video we will now have a smoothness feature set. ```{python examplecsvfilelayer21}import pandas as pdimport glob as globimport os# Load the CSV filecsv_file ="./dataoutput_STEP2_features/tsfresh_features_fromCOMdistance.csv"df = pd.read_csv(csv_file)# Display the first few rows of the DataFrameprint(df.head())```:::::: {.callout-note .callout-settingup collapse="true"}## DIY STEP 2: Feature Extraction 🛠️### Step 1: Install requirementsFor each script we have provided a `requirements.txt` file. You can install the requirements using pip, by first navigating to the folder where the script is located and then running the following command in your terminal:```bashcd[yourspecificrootadress]/code_STEP2_smoothness_features/pip install -r requirements.txt```### Step 2: Run the python scriptAssuming you have timeseries data (`./dataoutput_STEP1_2_timeseries/`), you can run the script using the following command:```bashcd[yourspecificrootadress]/code_STEP2_smoothness_features/python STEP2_featureextraction.py```:::# Step 3: Non-linear time series analysis (R)The development of infant-caregiver interaction was studied using Cross-Recurrence Quantification Analysis (CRQA). CRQA is a nonlinear time series analysis technique that can be used to study synchronization and the strength and direction of coupling dynamics between two systems (Marwan & Kurths, 2002; Shockley et al., 2002). Specifically, we used CRQA to quantify the strength and direction of coupling between infant and caregiver based on changes in their centre of mass during the 7 lab visits. To assess whether the overall strength of coupling changes over time we will use a generalized mixed model with the CRQA measure of coupling strength (determinism) as the dependent variable and age and dyad relation as predictors.Using CRQA we can quantify the extent to which the dynamics of two different systems are coupled over time. To do so, we evaluate whether two systems occupy the same regions in a shared multidimensional phase space in which each coordinate represents a state, and a trajectory represents the evolution of system states over time. We can reconstruct a phase space trajectory from a single observed time series by creating delayed copies that represent so-called surrogate dimensions. Takens’ theorem [@takens1981a] guarantees the reconstructed trajectory will be topologically equivalent to the original, the important dynamical properties are recovered, not the exact same trajectory. The procedure concerns determining a time lag (embedding delay) to construct the surrogate dimensions (we used the first local minimum of the mutual information function, cf. [@fraserIndependentCoordinatesStrange1986]) and determining how many surrogate dimensions we need (we used False Nearest Neighbor analysis, cf. [@kennelDeterminingEmbeddingDimension1992]). We obtained an estimate for the delay and number of embedding dimensions for each COM time series in the data set and used the maximum delay (828) and maximum embedding dimension (3) for all CRQA analyses.For each dyad we now have 2 multivariate time series that can be thought of as trajectories through a reconstructed 3 dimensional phase space (cf. [@hasselmanEarlyWarningSignals2022]). The next step is to create a distance matrix in which each cell represents the Euclidean distance between the coordinates of the two systems (see Figure below). The cells that form the main diagonal represent the distance between the states observed at exactly the same point in time. Moving away from the main diagonal, cells represent a distance to a state that occurred at an earlier or later point in time relative to the time series on the row or in columns. The next step is to choose a threshold value to determine which coordinates should be considered recurring states, yielding a binary matrix called the recurrence matrix, with only 0s and 1s. For each dyad and measurement occasion, a threshold was chosen that yielded a matrix in which a fixed number of cells contained a 1. These cells represent states that are very similar and occur in both time series also known as the Recurrence Rate, which we set at 5%. Recurring states around the main diagonal, indicate they occurred at approximately the same moment in time, recurrent states below or above the main diagonal occurred earlier (or later) relative to the time series on the X or Y axis.Many different measures can be calculated from the recurrence matrix, in the present study we focus on determinism (DET) which is the proportion of the number of recurrent points that form a diagonal line on the total number of recurrent points. Diagonal lines indicate a consecutive sequence of states that occurred in one time series is repeated in the other time series in almost exactly the same way, or, both systems followed the same trajectory through state space for a while. Therefore, determinism is generally considered a measure of the strength of the coupling between the two systems.![Figure S8. Cross-recurrence plot. The two time series P1 and P2 are delay-embedded using a lag of 838 to create three surrogate dimensions. For each timepoint in P1, a distance is calculated to every other time point in P2 and vice versa. A threshold value is chosen to create a binary cross-recurrence matrix with 5% recurrent points. All measures calculated based on the line structures (done with `R` package `casnet`, [@hasselmanCasnetToolboxStudying2025])](./images/CRP_figure.jpeg)::: {.callout-note .callout-settingup collapse="true"}## CRQA analysis in R 🚩{{< include ./code_STEP3_nonlinear_analysis/STEP3_nonlinearfeatures_CRQA.html >}}:::::: {.callout-note .callout-settingup collapse="true"}## DIY: CRQA 🛠️In order to run the more complex CRQA analysis described above, you can open the file *./code_STEP3_nonlinear_analysis/STEP3_nonlinearfeatures_CRQA.Rmd* in R Studio. Before running the markdown file you will need to make sure you have a few packages installed (run this in R or R Studio):*install.packages(c("rio","plyr","tidyverse","invctr","fpCompare","gplots","testthat"))**devtools::install_github("FredHasselman/casnet")*:::# Step 4: Statistical analysis (R)## Step 4.1: Statistical report on smoothness analysis in RFor the individual level analysis of the smoothness of the individual approach/avoidance behavior, we fit a linear mixed model using `lme4`[@batesFittingLinearMixedEffects2015] where the individual smoothness metric was the outcome variable, culture (Yurakaré or Polish) was the predictor variable, and we included a random intercept to account for individuals being nested within sibling pairs. Overall, the model suggests there was no evidence of an effect of culture on the smoothness of individual approach/avoidance movements, (β = 0.12, 95% CI [−0.53, 0.77], marginal R² = .004). For the Polish group, the estimated marginal mean was 38.3 (SE = 0.45, 95% CI [37.3, 39.2]). For the Yurakaré group, the estimated marginal mean was 39.5 (SE = 0.45, 95% CI [37.6, 49.5]). The figure below shows the overall pattern. In order to evaluate whether the results of these models were biased due to singular fit issues in estimating the random effects, we ran an equivalent linear regression including cluster robust standard error estimates [@mcneishUnnecessaryUbiquityHierarchical2017b] using the `clubSandwich`[@pustejovskyClubSandwichClusterRobustSandwich2025]`R`-package, which provided compatible results. For the dyadic distance between center of mass analyses, we fit a simple linear regression with the smoothness of the distance between the dyad members’ center of mass as the outcome variable and culture (Yurakaré vs Polish) as the predictor variable. Consistent with the prior analyses, we did not observe a significant effect for culture (β = 0.15, 95% CI [−0.81, 1.11], R² = .006). The estimated marginal mean for the Polish group was 39.0 (SE = 0.64, 95% CI [37.7, 40.3]), and for the Yurakaré group it was 39.3 (SE = 0.64, 95% CI [38.0, 40.6])::: {.callout-note .callout-settingup collapse="true"}## Detailed smoothness analysis in R 🚩{{< include ./code_STEP4_statisticalanalysis/STEP4_Smoothness_analysis.html >}}:::## Step 4.2: Statistical report on non-linear analysis in RTo analyze the development of parent-infant interactions, we fit a logistic (generalized) mixed model using lme4 [@batesFittingLinearMixedEffects2015], predicting the determinism (DET) measure obtained from CRQA for each dyad at each measurement occasion. The age of the infant (in days) was used as the time predictor and was centered on the average age of the sample at the first measurement occasion. The framework of generalized mixed models allows for the estimation of proportion data based on counts, where the denominator may vary from case to case. In the present dataset 6 dyads had 1 time series that was shorter than expected based on the selection criterion of 150 seconds (6 out of 86 dyadic series). Time series length will affect the number of recurrent points that can be detected; therefore, we used the number of recurrent points detected as a weight variable in the model (see supplementary data for details). A linear and quadratic effect of the time variable (age in days) were included in the model, as well as an interaction with a categorical variable indicating parent-infant relation; mother-daughter (N=6), mother-son (N=5), father-daughter (N=1) with the latter as the reference category. The dyad ID was entered as a random effect in the model. The model predictions are displayed in Figure blow. The estimated marginal means at 92.6 days (the average at the first measurement occasion) for the father-daughter dyads (M = .39, 95% CI [.36, .42]) did not differ from the mother-daughter dyads (M = .36, 95% CI [.35, .37]) or from the mother-son dyads (M = 0.40, 95% CI [.39, .41]), but the mother-daughter and mother-son dyads did differ from one another (OR = 0.86, zratio = -3.96, pTukey = 0.002). All interaction effects of the dyad type with linear and quadratic age effects were significant (see Table 1 below for the results of this model).#### Table S1 {.unnumbered}*Fixed Effects for Generalized Linear Mixed Model Predicting N_dlp/RP_N*| Predictor | β | SE | z | p ||-----------|---|----|----|---|| Intercept | -0.485 | 0.031 | -15.59 | < .001 || Gender (Girls) | -0.141 | 0.041 | -3.47 | < .001 || Age (Linear) | 0.403 | 0.003 | 153.65 | < .001 || Age (Quadratic) | -0.568 | 0.003 | -172.69 | < .001 || Gender × Age (Linear) | -0.174 | 0.004 | -42.27 | < .001 || Gender × Age (Quadratic) | 0.346 | 0.005 | 73.28 | < .001 |*Note.* Model: binomial family with logit link. N = 86 observations, 13 subjects.::: {.callout-note .callout-settingup collapse="true"}## Detailed CRQA statistical analysis in R 🚩{{< include ./code_STEP4_statisticalanalysis/STEP4_statisticalanalysis_CRQA.html >}}:::# Step 5: MaskingThe open-source MaskAnyone toolbox (see DIY below) facilitates secure and reproducible video data de-identification, crucial for behavioral analysis, by removing personally identifiable information in different degrees while preserving movement dynamics. Built with React, FastAPI [@Ramirez_FastAPI], and Docker, this modular platform offers various configurable deidentification algorithms (blurring, suppression, silhouette rendering) and integrates segmentation models like SAM, YOLOv8, and OpenPose/MediaPipe. Accessible via web and CLI, MaskAnyone supports scalable deidentification and privacy validation for researchers with varying technical expertise.Our masking approach equips researchers with context-sensitive masking decisions, which may have verying levels of reproducibility needed, or varying constraints such as cultural sensitivity and participant protection (see Table S2). Indeed, we think masking protocols should be applied through culturally aligned decision-making rather than indiscriminately. In our cross-cultural case study comparing Yurakaré sibling interactions with Polish lab sessions, we adopted masking strategies informed by participant consent, cultural and legal advisors, and recording flexibility, from which we learned that the notions of data being "identifiable" or "sensitive" may differ across communities and requires nuance. Below is a video that shows the masking procedure we opted for.<html><video width="600" height="400" controls><source src="./images/contoursvideosample.mp4" type="video/mp4"> Your browser does not support the video tag.</video></html>The table below the three masking strategies that we explored.*Table S2: MaskAnyone Strategy Elements (Hiding Approach)*| Strategy | Description | Effectiveness | Illustrative Results ||----------|-------------|---------------|---------------------|| **Blur** | Applies Gaussian blurring at a user-controllable blur intensity level. | Moderate, may be limited depending on blur intensity. | Progressively blurred versions maintaining color and general structure || **Pixellation** | Reduces image resolution by grouping pixels into uniform blocks, creating a mosaic effect | Moderate to high, depending on block size | Blocky, mosaic-like appearance with preserved general shapes || **Contours** | Uses Canny edge detection to preserve essential contours while eliminating finer details. User-controlled detail. | Moderate, balances privacy with utility. | Black and white line drawings showing only structural boundaries |*Note.* This table presents the three primary hiding strategies implemented in the MaskAnyone system, detailing their technical approach, privacy effectiveness, and expected visual outcomes.## Ethical Considerations: Need for Integration in Research Ethics Protocols and Informed ConsentEthical archiving answers to calls for wasting less resources in research and increasing reusability, reproducibility, as well as the ability for research integrity entities to assess research quality [@resnikShouldResearchersDestroy2024]. Indeed, as in the latter case masking is relevant even when data is not openly shared, but also enabling more data archiving, and archiving while lower the chance of privacy leaks (e.g., when data is breached). Effective masking requires ongoing dialogue with research participants and communities. We recommend establishing clear consent protocols that explain to participants the masking options. Institutions also need to regularly review privacy adequacy as technology evolves, also to maintain flexibility to adjust approaches based on participant feedback. The goal is not merely technical anonymization, but culturally respectful privacy protection that enables meaningful research while honoring participant dignity and community values. ::: {.callout-note .callout-settingup collapse="true"}## DIY: Trying masking yourself 🛠️In earlier work of the masking project we utilized a light-weight masking approach using Mediapipe [@lugaresiMediaPipeFrameworkBuilding2019] called MaskedPiper [@owoyeleMaskedPiperMaskingPersonal2022]. We have recently provided an updated and enriched python notebook that allows for masking single-person videos in several ways. You can find the code on www.envisionbox.org in one of the modules called ["Full-body tracking, +masking/blurring, and movement tracing (Python)"](https://envisionbox.org/embedded_Mediapiping.html).:::::: {.callout-note .callout-settingup collapse="true"}## DIY: Trying masking yourself (Advanced) 🛠️MaskAnyone is the most recent release version of the masking project [@owoyeleMaskAnyoneToolkitOffering2024]. The MaskAnyone code is available on GitHub: [MaskAnyone](https://github.com/MaskAnyone/MaskAnyone) and there is an installation guide available on the GitHub page.:::# References